Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:networking: (Synchronous) Consistent Broadcast for Filecoin EC #9858

Merged
merged 29 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c03ad9d
wip: draft impl of consistent bcast
adlrocha Sep 12, 2022
76031d7
minor fixes. unit tests added
adlrocha Sep 13, 2022
1dadff3
added garbage collection test. minor fix
adlrocha Sep 13, 2022
d1a4f1d
fixed bugs in consistent broadcast integration
adlrocha Sep 14, 2022
72c80a3
fixed unit tests
adlrocha Sep 16, 2022
0209052
minor changes
adlrocha Sep 16, 2022
749d133
added testground test case for epoch boundary attack
adlrocha Sep 19, 2022
8f48631
index new testcase
adlrocha Sep 19, 2022
ebafb65
minor fixes. sync mining.
adlrocha Sep 22, 2022
35f8abc
minor fix in composition
adlrocha Sep 23, 2022
bf65418
sync mining and gossipsub attacker depdendency
adlrocha Sep 26, 2022
894da94
Merge branch 'master' into adlrocha/consistent-bcast
adlrocha Dec 13, 2022
91bd679
consistent broadcast delay as build param
adlrocha Dec 13, 2022
f2cc452
remove error from rcvBlock. type/docs gen
adlrocha Dec 13, 2022
939e515
fix race in cb cache
adlrocha Dec 13, 2022
d574d04
set small cb delivery delay for paych itests
adlrocha Dec 13, 2022
3988cc9
disabling cb delivery delay for sync tests
adlrocha Dec 13, 2022
c11ffa5
address review
adlrocha Mar 10, 2023
f59c246
Update chain/sub/bcast/consistent.go
adlrocha Mar 16, 2023
8d260d7
address review
adlrocha Mar 16, 2023
90c2f9d
minor fix
adlrocha Mar 16, 2023
fa7e1ef
set CB delay to 2 secs
adlrocha Mar 20, 2023
92f6d3e
global locking strategy for blockInfo map
adlrocha Mar 28, 2023
df82a82
add comments
adlrocha Mar 28, 2023
1a771e4
include a deeper gc round
adlrocha Mar 28, 2023
b7c297c
Merge branch 'master' into adlrocha/consistent-bcast
adlrocha Mar 28, 2023
f24fc83
add CB param to all testnet builds
adlrocha Mar 28, 2023
103d786
return CBDeliveryDelay into a var
adlrocha Mar 29, 2023
682ddf6
Update chain/sub/bcast/consistent.go
adlrocha Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build/params_2k.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package build
import (
"os"
"strconv"
"time"

"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -139,3 +140,7 @@ const BootstrapPeerThreshold = 1
const Eip155ChainId = 31415926

var WhitelistedBlock = cid.Undef

// Reducing the delivery delay for equivocation of
// consistent broadcast to just half a second.
var CBDeliveryDelay = 500 * time.Milisecond
7 changes: 7 additions & 0 deletions build/params_butterfly.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package build

import (
"time"

"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -87,3 +89,8 @@ const BootstrapPeerThreshold = 2
const Eip155ChainId = 3141592

var WhitelistedBlock = cid.Undef

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
6 changes: 6 additions & 0 deletions build/params_calibnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package build
import (
"os"
"strconv"
"time"

"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -122,3 +123,8 @@ const BootstrapPeerThreshold = 4
const Eip155ChainId = 314159

var WhitelistedBlock = cid.Undef

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
6 changes: 6 additions & 0 deletions build/params_interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package build
import (
"os"
"strconv"
"time"

"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -128,3 +129,8 @@ const BootstrapPeerThreshold = 2
const Eip155ChainId = 3141592

var WhitelistedBlock = cid.Undef

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
6 changes: 6 additions & 0 deletions build/params_mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"os"
"strconv"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -137,3 +138,8 @@ const Eip155ChainId = 314

// we skip checks on message validity in this block to sidestep the zero-bls signature
var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjyguoxvhx77malc2lzn2ybi")

// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second
adlrocha marked this conversation as resolved.
Show resolved Hide resolved
197 changes: 197 additions & 0 deletions chain/sub/bcast/consistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package bcast

import (
"context"
"sync"
"time"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/types"
)

var log = logging.Logger("sub-cb")

const (
// GcSanityCheck determines the number of epochs in the past
// that will be garbage collected from the current epoch.
GcSanityCheck = 100
// GcLookback determines the number of epochs kept in the consistent
// broadcast cache.
GcLookback = 5
// GcDeepCheck determines the number of epochs in the past that we
// we try cleaning in the deep garbage collection round.
GcDeepCheck = 2880 // (24h*60m*60s)/30s per epoch
// GcDeepInterval determines after the number of epochs for which
// we are going to start a deeper garbage collection round.
GcDeepInterval = 1000
)

type blksInfo struct {
ctx context.Context
cancel context.CancelFunc
blks []cid.Cid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure about the thread-safety of this blks slice. We seem to be reading & modifying it somewhat unsafely, and while it's probably the case that the system won't ever race here, I'm...not sure.

Can we reason through this a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using a sync.Map to prevent parallel read/writes and avoid requiring a specific lock for every blksInfo which may be harder to handle. I assume blksInfo as a single unit of data, and never read the data structure of blocks directly. Thus the load, store methods.

That being said, I may have missed some subtle detail. Let me know if this makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my concern is that we load the blksInfo out of the sync.Map, but then read and modify the blks field. I think if we have 2 calls to RcvBlock, they both load the same bInfo object (safely). After that, however, they're gonna be concurrently accessing the blks slice, which I don't think is safe (eg. iterating over the blks field, while it's also being modified).

I'm not 100% sure that I'm right, and even if so, I'm not sure whether simultaneous calls to RcvBlock are possible (need to 👀 more), but does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looking at the code closer, we don't actually ever modify it. So we're probably okay here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are OK on that, but I think you are right that there may be a race here as one routine may load the dict and another one may load it in parallel before the previous one has been able to store any potential changes.

I think this is fixed by moving this lock after the load statement in line 125. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adlrocha I don't think that's sufficient. I think it needs to be held until no more writing happens, which is the end of the method (as currently written). Consider the following situation:

  • 2 blocks A and B come in with the same VRFProof.
  • A gets the lock first, creates an empty BCastDict (line 119), doesn't find a match for the common key on line 125, drops the lock
  • B gets the lock, loads the empty BCastDict created by A, also doesn't find a match for the common key on line 125, drops the lock
  • A stores blkACid as the only entry corresponding to key on line 141, returns
  • B overwrites that, storing blkBCid as the only entry corresponding to key on line 141, returns
  • Both A and B succeed their respective calls to WaitForDelivery

}

type bcastDict struct {
m map[string]*blksInfo
}

func (bd *bcastDict) load(key []byte) (*blksInfo, bool) {
v, ok := bd.m[string(key)]
if !ok {
return nil, ok
}
return v, ok
}

func (bd *bcastDict) blkLen(key []byte) int {
return len(bd.m[string(key)].blks)
}

func (bd *bcastDict) store(key []byte, d *blksInfo) {
bd.m[string(key)] = d
}

// ConsistentBCast tracks recent information about the
// blocks and tickets received at different epochs
type ConsistentBCast struct {
lk sync.RWMutex
delay time.Duration
m map[abi.ChainEpoch]*bcastDict
lastDeepGc abi.ChainEpoch
}

func newBcastDict() *bcastDict {
return &bcastDict{m: make(map[string]*blksInfo)}
}

func BCastKey(bh *types.BlockHeader) []byte {
return bh.Ticket.VRFProof
}

func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
return &ConsistentBCast{
delay: delay,
m: make(map[abi.ChainEpoch]*bcastDict),
}
}

func cidExists(cids []cid.Cid, c cid.Cid) bool {
for _, v := range cids {
if v == c {
return true
}
}
return false
}

func (bInfo *blksInfo) eqErr() error {
bInfo.cancel()
return xerrors.Errorf("different blocks with the same ticket already seen")
}

func (cb *ConsistentBCast) Len() int {
cb.lk.RLock()
defer cb.lk.RUnlock()
return len(cb.m)
}

// RcvBlock is called every time a new block is received through the network.
//
// This function keeps track of all the blocks with a specific VRFProof received
// for the same height. Every time a new block with a VRFProof not seen at certain
// height is received, a new timer is triggered to wait for the delay time determined by
// the consistent broadcast before informing the syncer. During this time, if a new
// block with the same VRFProof for that height is received, it means a miner is
// trying to equivocate, and both blocks are discarded.
//
// The delay time should be set to a value high enough to allow any block sent for
// certain epoch to be propagated to a large amount of miners in the network.
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
adlrocha marked this conversation as resolved.
Show resolved Hide resolved
cb.lk.Lock()
defer cb.lk.Unlock()
bcastDict, ok := cb.m[blk.Header.Height]
if !ok {
bcastDict = newBcastDict()
cb.m[blk.Header.Height] = bcastDict
}

key := BCastKey(blk.Header)
blkCid := blk.Cid()

bInfo, ok := bcastDict.load(key)
if ok {
if len(bInfo.blks) > 1 {
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
adlrocha marked this conversation as resolved.
Show resolved Hide resolved
return
}

if !cidExists(bInfo.blks, blkCid) {
bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)})
// By calling bInfo.eqErr() inside this log we cancel the context for all blocks waiting for
// the epoch-ticket combination making them to fail and not be sent to the syncer, as
// a potential equivocation is detected.
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
return
}
return
}

ctx, cancel := context.WithTimeout(ctx, cb.delay)
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
}

// WaitForDelivery is called before informing the syncer about a new block
// to check if the consistent broadcast delay triggered or if the block should
// be held off for a bit more time.
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
cb.lk.RLock()
bcastDict := cb.m[bh.Height]
key := BCastKey(bh)
bInfo, ok := bcastDict.load(key)
cb.lk.RUnlock()
if !ok {
return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
}

// Wait for the timeout
<-bInfo.ctx.Done()
if bcastDict.blkLen(key) > 1 {
return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
}
return nil
}

// GarbageCollect cleans the consistent broadcast cache periodically.
//
// A light garbage collection is triggered before every block delivery
// while a deeper one is triggered once every GcDeepCheck to ensure
// that nothing was left behind.
func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) {
cb.lk.Lock()
defer cb.lk.Unlock()

// perform a deeper sanity check every now and then
gcRange := GcSanityCheck
if cb.lastDeepGc+GcDeepInterval > currEpoch {
gcRange = GcDeepCheck
cb.lastDeepGc = currEpoch
}

// keep currEpoch-gcRange and delete a few more in the past
// as a sanity-check
// Garbage collection is triggered before block delivery,
// and we use the sanity-check in case there were a few rounds
// without delivery, and the garbage collection wasn't triggered
// for a few epochs.
for i := 0; i < gcRange; i++ {
if currEpoch > GcLookback {
delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i))
}
}
}
Loading