diff --git a/build/params_2k.go b/build/params_2k.go index 962b2f9ba45..8220ce8aaf8 100644 --- a/build/params_2k.go +++ b/build/params_2k.go @@ -6,6 +6,7 @@ package build import ( "os" "strconv" + "time" "github.com/ipfs/go-cid" @@ -139,3 +140,7 @@ const BootstrapPeerThreshold = 1 const Eip155ChainId = 31415926 var WhitelistedBlock = cid.Undef + +// Reducing the delivery delay for equivocation of +// consistent broadcast to just half a second. +var CBDeliveryDelay = 500 * time.Milisecond diff --git a/build/params_butterfly.go b/build/params_butterfly.go index b00381b444b..4fdac1ec882 100644 --- a/build/params_butterfly.go +++ b/build/params_butterfly.go @@ -4,6 +4,8 @@ package build import ( + "time" + "github.com/ipfs/go-cid" "github.com/filecoin-project/go-address" @@ -87,3 +89,8 @@ const BootstrapPeerThreshold = 2 const Eip155ChainId = 3141592 var WhitelistedBlock = cid.Undef + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second diff --git a/build/params_calibnet.go b/build/params_calibnet.go index 32923f7a869..35ae1796c88 100644 --- a/build/params_calibnet.go +++ b/build/params_calibnet.go @@ -6,6 +6,7 @@ package build import ( "os" "strconv" + "time" "github.com/ipfs/go-cid" @@ -122,3 +123,8 @@ const BootstrapPeerThreshold = 4 const Eip155ChainId = 314159 var WhitelistedBlock = cid.Undef + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second diff --git a/build/params_interop.go b/build/params_interop.go index 4d94de049c0..72cfdca3543 100644 --- a/build/params_interop.go +++ b/build/params_interop.go @@ -6,6 +6,7 @@ package build import ( "os" "strconv" + "time" "github.com/ipfs/go-cid" @@ -128,3 +129,8 @@ const BootstrapPeerThreshold = 2 const Eip155ChainId = 3141592 var WhitelistedBlock = cid.Undef + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second diff --git a/build/params_mainnet.go b/build/params_mainnet.go index 453cdafafb7..d4cf6ff4b5f 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -7,6 +7,7 @@ import ( "math" "os" "strconv" + "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -137,3 +138,8 @@ const Eip155ChainId = 314 // we skip checks on message validity in this block to sidestep the zero-bls signature var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjyguoxvhx77malc2lzn2ybi") + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go new file mode 100644 index 00000000000..58e8bc98fd8 --- /dev/null +++ b/chain/sub/bcast/consistent.go @@ -0,0 +1,197 @@ +package bcast + +import ( + "context" + "sync" + "time" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" +) + +var log = logging.Logger("sub-cb") + +const ( + // GcSanityCheck determines the number of epochs in the past + // that will be garbage collected from the current epoch. + GcSanityCheck = 100 + // GcLookback determines the number of epochs kept in the consistent + // broadcast cache. + GcLookback = 5 + // GcDeepCheck determines the number of epochs in the past that we + // we try cleaning in the deep garbage collection round. + GcDeepCheck = 2880 // (24h*60m*60s)/30s per epoch + // GcDeepInterval determines after the number of epochs for which + // we are going to start a deeper garbage collection round. + GcDeepInterval = 1000 +) + +type blksInfo struct { + ctx context.Context + cancel context.CancelFunc + blks []cid.Cid +} + +type bcastDict struct { + m map[string]*blksInfo +} + +func (bd *bcastDict) load(key []byte) (*blksInfo, bool) { + v, ok := bd.m[string(key)] + if !ok { + return nil, ok + } + return v, ok +} + +func (bd *bcastDict) blkLen(key []byte) int { + return len(bd.m[string(key)].blks) +} + +func (bd *bcastDict) store(key []byte, d *blksInfo) { + bd.m[string(key)] = d +} + +// ConsistentBCast tracks recent information about the +// blocks and tickets received at different epochs +type ConsistentBCast struct { + lk sync.RWMutex + delay time.Duration + m map[abi.ChainEpoch]*bcastDict + lastDeepGc abi.ChainEpoch +} + +func newBcastDict() *bcastDict { + return &bcastDict{m: make(map[string]*blksInfo)} +} + +func BCastKey(bh *types.BlockHeader) []byte { + return bh.Ticket.VRFProof +} + +func NewConsistentBCast(delay time.Duration) *ConsistentBCast { + return &ConsistentBCast{ + delay: delay, + m: make(map[abi.ChainEpoch]*bcastDict), + } +} + +func cidExists(cids []cid.Cid, c cid.Cid) bool { + for _, v := range cids { + if v == c { + return true + } + } + return false +} + +func (bInfo *blksInfo) eqErr() error { + bInfo.cancel() + return xerrors.Errorf("different blocks with the same ticket already seen") +} + +func (cb *ConsistentBCast) Len() int { + cb.lk.RLock() + defer cb.lk.RUnlock() + return len(cb.m) +} + +// RcvBlock is called every time a new block is received through the network. +// +// This function keeps track of all the blocks with a specific VRFProof received +// for the same height. Every time a new block with a VRFProof not seen at certain +// height is received, a new timer is triggered to wait for the delay time determined by +// the consistent broadcast before informing the syncer. During this time, if a new +// block with the same VRFProof for that height is received, it means a miner is +// trying to equivocate, and both blocks are discarded. +// +// The delay time should be set to a value high enough to allow any block sent for +// certain epoch to be propagated to a large amount of miners in the network. +func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { + cb.lk.Lock() + defer cb.lk.Unlock() + bcastDict, ok := cb.m[blk.Header.Height] + if !ok { + bcastDict = newBcastDict() + cb.m[blk.Header.Height] = bcastDict + } + + key := BCastKey(blk.Header) + blkCid := blk.Cid() + + bInfo, ok := bcastDict.load(key) + if ok { + if len(bInfo.blks) > 1 { + log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr()) + return + } + + if !cidExists(bInfo.blks, blkCid) { + bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)}) + // By calling bInfo.eqErr() inside this log we cancel the context for all blocks waiting for + // the epoch-ticket combination making them to fail and not be sent to the syncer, as + // a potential equivocation is detected. + log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr()) + return + } + return + } + + ctx, cancel := context.WithTimeout(ctx, cb.delay) + bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) +} + +// WaitForDelivery is called before informing the syncer about a new block +// to check if the consistent broadcast delay triggered or if the block should +// be held off for a bit more time. +func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { + cb.lk.RLock() + bcastDict := cb.m[bh.Height] + key := BCastKey(bh) + bInfo, ok := bcastDict.load(key) + cb.lk.RUnlock() + if !ok { + return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) + } + + // Wait for the timeout + <-bInfo.ctx.Done() + if bcastDict.blkLen(key) > 1 { + return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) + } + return nil +} + +// GarbageCollect cleans the consistent broadcast cache periodically. +// +// A light garbage collection is triggered before every block delivery +// while a deeper one is triggered once every GcDeepCheck to ensure +// that nothing was left behind. +func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { + cb.lk.Lock() + defer cb.lk.Unlock() + + // perform a deeper sanity check every now and then + gcRange := GcSanityCheck + if cb.lastDeepGc+GcDeepInterval > currEpoch { + gcRange = GcDeepCheck + cb.lastDeepGc = currEpoch + } + + // keep currEpoch-gcRange and delete a few more in the past + // as a sanity-check + // Garbage collection is triggered before block delivery, + // and we use the sanity-check in case there were a few rounds + // without delivery, and the garbage collection wasn't triggered + // for a few epochs. + for i := 0; i < gcRange; i++ { + if currEpoch > GcLookback { + delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i)) + } + } +} diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go new file mode 100644 index 00000000000..8beb0574f36 --- /dev/null +++ b/chain/sub/bcast/consistent_test.go @@ -0,0 +1,223 @@ +package bcast_test + +import ( + "context" + "crypto/rand" + "fmt" + mrand "math/rand" + "strconv" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/sub/bcast" + "github.com/filecoin-project/lotus/chain/types" +) + +const TEST_DELAY = 1 * time.Second + +func TestSimpleDelivery(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + // Check that we wait for delivery. + start := time.Now() + testSimpleDelivery(t, cb, 100, 5) + since := time.Since(start) + require.GreaterOrEqual(t, since, TEST_DELAY) +} + +func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) { + ctx := context.Background() + + wg := new(sync.WaitGroup) + errs := make([]error, 0) + wg.Add(numBlocks) + for i := 0; i < numBlocks; i++ { + go func(i int) { + defer wg.Done() + // Add a random delay in block reception + r := mrand.Intn(200) + time.Sleep(time.Duration(r) * time.Millisecond) + blk := newBlock(t, epoch, randomProof(t), []byte("test"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + if err != nil { + errs = append(errs, err) + } + }(i) + } + wg.Wait() + + for _, v := range errs { + t.Fatalf("error in delivery: %s", v) + } +} + +func TestSeveralEpochs(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + numEpochs := 6 + wg := new(sync.WaitGroup) + wg.Add(numEpochs) + for i := 0; i < numEpochs; i++ { + go func(i int) { + defer wg.Done() + // Add a random delay between epochs + r := mrand.Intn(500) + time.Sleep(time.Duration(i)*TEST_DELAY + time.Duration(r)*time.Millisecond) + rNumBlocks := mrand.Intn(5) + flip, err := flipCoin(0.7) + require.NoError(t, err) + t.Logf("Running epoch %d with %d with equivocation=%v", i, rNumBlocks, !flip) + if flip { + testSimpleDelivery(t, cb, abi.ChainEpoch(i), rNumBlocks) + } else { + testEquivocation(t, cb, abi.ChainEpoch(i), rNumBlocks) + } + cb.GarbageCollect(abi.ChainEpoch(i)) + }(i) + } + wg.Wait() + require.Equal(t, cb.Len(), numEpochs) +} + +// bias is expected to be 0-1 +func flipCoin(bias float32) (bool, error) { + if bias > 1 || bias < 0 { + return false, fmt.Errorf("wrong bias. expected (0,1)") + } + r := mrand.Intn(100) + return r < int(bias*100), nil +} + +func testEquivocation(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) { + ctx := context.Background() + + wg := new(sync.WaitGroup) + errs := make([]error, 0) + wg.Add(numBlocks + 1) + for i := 0; i < numBlocks; i++ { + proof := randomProof(t) + // Valid blocks + go func(i int, proof []byte) { + defer wg.Done() + r := mrand.Intn(200) + time.Sleep(time.Duration(r) * time.Millisecond) + blk := newBlock(t, epoch, proof, []byte("valid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + if err != nil { + errs = append(errs, err) + } + }(i, proof) + + // Equivocation for the last block + if i == numBlocks-1 { + // Attempting equivocation + go func(i int, proof []byte) { + defer wg.Done() + // Use the same proof and the same epoch + blk := newBlock(t, epoch, proof, []byte("invalid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + // Equivocation detected + require.Error(t, err) + }(i, proof) + } + } + wg.Wait() + + // The equivocated block arrived too late, so + // we delivered all the valid blocks. + require.Len(t, errs, 1) +} + +func TestEquivocation(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + testEquivocation(t, cb, 100, 5) +} + +func TestFailedEquivocation(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + ctx := context.Background() + numBlocks := 5 + + wg := new(sync.WaitGroup) + errs := make([]error, 0) + wg.Add(numBlocks + 1) + for i := 0; i < numBlocks; i++ { + proof := randomProof(t) + // Valid blocks + go func(i int, proof []byte) { + defer wg.Done() + r := mrand.Intn(200) + time.Sleep(time.Duration(r) * time.Millisecond) + blk := newBlock(t, 100, proof, []byte("valid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + if err != nil { + errs = append(errs, err) + } + }(i, proof) + + // Equivocation for the last block + if i == numBlocks-1 { + // Attempting equivocation + go func(i int, proof []byte) { + defer wg.Done() + // The equivocated block arrives late + time.Sleep(2 * TEST_DELAY) + // Use the same proof and the same epoch + blk := newBlock(t, 100, proof, []byte("invalid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + // Equivocation detected + require.Error(t, err) + }(i, proof) + } + } + wg.Wait() + + // The equivocated block arrived too late, so + // we delivered all the valid blocks. + require.Len(t, errs, 0) +} + +func randomProof(t *testing.T) []byte { + proof := make([]byte, 10) + _, err := rand.Read(proof) + if err != nil { + t.Fatal(err) + } + return proof +} + +func newBlock(t *testing.T, epoch abi.ChainEpoch, proof []byte, mCidSeed []byte) *types.BlockMsg { + h, err := multihash.Sum(mCidSeed, multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + testCid := cid.NewCidV0(h) + addr, err := address.NewIDAddress(10) + if err != nil { + t.Fatal(err) + } + bh := &types.BlockHeader{ + Miner: addr, + ParentStateRoot: testCid, + ParentMessageReceipts: testCid, + Ticket: &types.Ticket{ + VRFProof: proof, + }, + Height: epoch, + Messages: testCid, + } + return &types.BlockMsg{ + Header: bh, + } +} diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index f641f0ff933..87598bd2e03 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/sub/bcast" "github.com/filecoin-project/lotus/chain/sub/ratelimit" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" @@ -43,10 +44,11 @@ var msgCidPrefix = cid.Prefix{ MhLength: 32, } -func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { +func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self peer.ID, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { // Timeout after (block time + propagation delay). This is useless at // this point. timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second + cb := bcast.NewConsistentBCast(build.CBDeliveryDelay) for { msg, err := bsub.Next(ctx) @@ -67,6 +69,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha src := msg.GetFrom() + // Notify consistent broadcast about a new block + cb.RcvBlock(ctx, blk) + go func() { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -102,6 +107,20 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner) } + // When we propose a new block ourselves, the proposed block also gets here through SyncSubmitBlock. + // If we are the block proposers we don't need to wait for delivery, we know the blocks are + // honest. + if src != self { + log.Debugf("Waiting for consistent broadcast of block in height: %v", blk.Header.Height) + if err := cb.WaitForDelivery(blk.Header); err != nil { + log.Errorf("not informing syncer about new block, potential equivocation detected (cid: %s, source: %s): %s; ", blk.Header.Cid(), src, err) + return + } + } + // Garbage collect the broadcast state + cb.GarbageCollect(blk.Header.Height) + log.Debugf("Block in height %v delivered successfully (cid=%s)", blk.Header.Height, blk.Cid()) + if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, BlsMessages: bmsgs, diff --git a/chain/sync_test.go b/chain/sync_test.go index a86d42f17e6..1f32d96ec9f 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -45,6 +45,11 @@ func init() { policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1) policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048)) policy.SetMinVerifiedDealSize(abi.NewStoragePower(256)) + + // these tests assume really fast block times. disabling + // the consistent broadcast delay to avoid them from adding + // an unnecessary overhead. + build.CBDeliveryDelay = 2 * time.Millisecond } const source = 0 diff --git a/itests/kit/init.go b/itests/kit/init.go index 9397c53a218..dbcb49aae31 100644 --- a/itests/kit/init.go +++ b/itests/kit/init.go @@ -40,6 +40,11 @@ func init() { build.InsecurePoStValidation = true + // Disabling consistent broadcast in itests. Many tests use really fast + // block times, and adding this additional delay for block delivery + // would make these tests to fail. + build.CBDeliveryDelay = 0 + if err := os.Setenv("BELLMAN_NO_GPU", "1"); err != nil { panic(fmt.Sprintf("failed to set BELLMAN_NO_GPU env variable: %s", err)) } diff --git a/node/modules/services.go b/node/modules/services.go index 9acebd07105..750d22fbaff 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -167,7 +167,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, panic(err) } - go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) + go sub.HandleIncomingBlocks(ctx, blocksub, h.ID(), s, bserv, h.ConnManager()) } func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {