Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into blocksync-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 9, 2020
2 parents b33db9c + de772bf commit cb3b0ab
Show file tree
Hide file tree
Showing 21 changed files with 264 additions and 84 deletions.
15 changes: 15 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## filecoin-project/lotus CODEOWNERS
## Refer to https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners.
##
## These users or groups will be automatically assigned as reviewers every time
## a PR is submitted that modifies code in the specified locations.
##
## The Lotus repo configuration requires that at least ONE codeowner approves
## the PR before merging.

### Global owners.
* @magik6k @whyrusleeping

### Conformance testing.
conformance/ @raulk
extern/test-vectors @raulk
21 changes: 21 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/specs-actors/actors/runtime/proof"
Expand Down Expand Up @@ -709,8 +710,28 @@ const (
StageMessages
StageSyncComplete
StageSyncErrored
StageFetchingMessages
)

func (v SyncStateStage) String() string {
switch v {
case StageHeaders:
return "header sync"
case StagePersistHeaders:
return "persisting headers"
case StageMessages:
return "message sync"
case StageSyncComplete:
return "complete"
case StageSyncErrored:
return "error"
case StageFetchingMessages:
return "fetching messages"
default:
return fmt.Sprintf("<unknown: %d>", v)
}
}

type MpoolChange int

const (
Expand Down
2 changes: 1 addition & 1 deletion build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func buildType() string {
}

// BuildVersion is the local build version, set by build system
const BuildVersion = "0.6.1"
const BuildVersion = "0.6.2-rc1"

func UserVersion() string {
return BuildVersion + buildType() + CurrentCommit
Expand Down
4 changes: 4 additions & 0 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ func (mp *MessagePool) republishPendingMessages() error {
mp.curTsLk.Unlock()
return xerrors.Errorf("computing basefee: %w", err)
}

baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor)
if baseFeeLowerBoundFactor.LessThan(minimumBaseFee) {
baseFeeLowerBound = minimumBaseFee
}

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.lk.Lock()
Expand Down
71 changes: 71 additions & 0 deletions chain/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package messagepool
import (
"context"
"math/big"
"math/rand"
"sort"
"time"

Expand Down Expand Up @@ -304,6 +305,69 @@ tailLoop:
log.Infow("pack tail chains done", "took", dt)
}

// if we have gasLimit to spare, pick some random (non-negative) chains to fill the block
// we pick randomly so that we minimize the probability of duplication among all miners
startRandom := time.Now()
if gasLimit >= minGas {
shuffleChains(chains)

for _, chain := range chains {
// have we filled the block
if gasLimit < minGas {
break
}

// has it been merged or invalidated?
if chain.merged || !chain.valid {
continue
}

// is it negative?
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
continue
}

// compute the dependencies that must be merged and the gas limit including deps
chainGasLimit := chain.gasLimit
depGasLimit := int64(0)
var chainDeps []*msgChain
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
chainDeps = append(chainDeps, curChain)
chainGasLimit += curChain.gasLimit
depGasLimit += curChain.gasLimit
}

// do the deps fit? if the deps won't fit, invalidate the chain
if depGasLimit > gasLimit {
chain.Invalidate()
continue
}

// do they fit as is? if it doesn't, trim to make it fit if possible
if chainGasLimit > gasLimit {
chain.Trim(gasLimit-depGasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))

if !chain.valid {
continue
}
}

// include it together with all dependencies
for i := len(chainDeps) - 1; i >= 0; i-- {
curChain := chainDeps[i]
curChain.merged = true
result = append(result, curChain.msgs...)
}

chain.merged = true
result = append(result, chain.msgs...)
gasLimit -= chainGasLimit
}
}
if dt := time.Since(startRandom); dt > time.Millisecond {
log.Infow("pack random tail chains done", "took", dt)
}

return result, nil
}

Expand Down Expand Up @@ -852,3 +916,10 @@ func (mc *msgChain) BeforeEffective(other *msgChain) bool {
(mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) ||
(mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
}

func shuffleChains(lst []*msgChain) {
for i := range lst {
j := rand.Intn(i + 1)
lst[i], lst[j] = lst[j], lst[i]
}
}
10 changes: 5 additions & 5 deletions chain/stmgr/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
"golang.org/x/xerrors"
)

var ForksAtHeight = map[abi.ChainEpoch]func(context.Context, *StateManager, types.StateTree) error{
var ForksAtHeight = map[abi.ChainEpoch]func(context.Context, *StateManager, types.StateTree, *types.TipSet) error{
build.UpgradeBreezeHeight: UpgradeFaucetBurnRecovery,
}

func (sm *StateManager) handleStateForks(ctx context.Context, st types.StateTree, height abi.ChainEpoch) (err error) {
func (sm *StateManager) handleStateForks(ctx context.Context, st types.StateTree, height abi.ChainEpoch, ts *types.TipSet) (err error) {
f, ok := ForksAtHeight[height]
if ok {
err := f(ctx, sm, st)
err := f(ctx, sm, st, ts)
if err != nil {
return err
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func doTransfer(tree types.StateTree, from, to address.Address, amt abi.TokenAmo
return nil
}

func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, tree types.StateTree) error {
func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, tree types.StateTree, ts *types.TipSet) error {
// Some initial parameters
FundsForMiners := types.FromFil(1_000_000)
LookbackEpoch := abi.ChainEpoch(32000)
Expand All @@ -91,7 +91,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, tree types
}

// Grab lookback state for account checks
lbts, err := sm.ChainStore().GetTipsetByHeight(ctx, LookbackEpoch, nil, false)
lbts, err := sm.ChainStore().GetTipsetByHeight(ctx, LookbackEpoch, ts, false)
if err != nil {
return xerrors.Errorf("failed to get tipset at lookback height: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion chain/stmgr/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestForkHeightTriggers(t *testing.T) {
t.Fatal(err)
}

stmgr.ForksAtHeight[testForkHeight] = func(ctx context.Context, sm *StateManager, st types.StateTree) error {
stmgr.ForksAtHeight[testForkHeight] = func(ctx context.Context, sm *StateManager, st types.StateTree, ts *types.TipSet) error {
cst := cbor.NewCborStore(sm.ChainStore().Blockstore())

act, err := st.GetActor(taddr)
Expand Down
6 changes: 3 additions & 3 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (sm *StateManager) ExecutionTrace(ctx context.Context, ts *types.TipSet) (c

type ExecCallback func(cid.Cid, *types.Message, *vm.ApplyRet) error

func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEpoch, pstate cid.Cid, bms []store.BlockMessages, epoch abi.ChainEpoch, r vm.Rand, cb ExecCallback, baseFee abi.TokenAmount) (cid.Cid, cid.Cid, error) {
func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEpoch, pstate cid.Cid, bms []store.BlockMessages, epoch abi.ChainEpoch, r vm.Rand, cb ExecCallback, baseFee abi.TokenAmount, ts *types.TipSet) (cid.Cid, cid.Cid, error) {

vmopt := &vm.VMOpts{
StateBase: pstate,
Expand Down Expand Up @@ -201,7 +201,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp

for i := parentEpoch; i < epoch; i++ {
// handle state forks
err = sm.handleStateForks(ctx, vmi.StateTree(), i)
err = sm.handleStateForks(ctx, vmi.StateTree(), i, ts)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("error handling state forks: %w", err)
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, ts *types.TipSet

baseFee := blks[0].ParentBaseFee

return sm.ApplyBlocks(ctx, parentEpoch, pstate, blkmsgs, blks[0].Height, r, cb, baseFee)
return sm.ApplyBlocks(ctx, parentEpoch, pstate, blkmsgs, blks[0].Height, r, cb, baseFee, ts)
}

func (sm *StateManager) parentState(ts *types.TipSet) cid.Cid {
Expand Down
2 changes: 1 addition & 1 deletion chain/stmgr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,

for i := ts.Height(); i < height; i++ {
// handle state forks
err = sm.handleStateForks(ctx, vmi.StateTree(), i)
err = sm.handleStateForks(ctx, vmi.StateTree(), i, ts)
if err != nil {
return cid.Undef, nil, xerrors.Errorf("error handling state forks: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*

// fills out each of the given tipsets with messages and calls the callback with it
func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipSet, cb func(context.Context, *store.FullTipSet) error) error {
ss := extractSyncState(ctx)
ctx, span := trace.StartSpan(ctx, "iterFullTipsets")
defer span.End()

Expand Down Expand Up @@ -1466,6 +1467,7 @@ mainLoop:

nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index

ss.SetStage(api.StageFetchingMessages)
var bstout []*exchange.CompactedMessages
for len(bstout) < batchSize {
next := headers[nextI]
Expand All @@ -1485,6 +1487,7 @@ mainLoop:
bstout = append(bstout, bstips...)
nextI += len(bstips)
}
ss.SetStage(api.StageMessages)

for bsi := 0; bsi < len(bstout); bsi++ {
// temp storage so we don't persist data we dont want to
Expand Down
18 changes: 0 additions & 18 deletions chain/syncstate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package chain

import (
"fmt"
"sync"
"time"

Expand All @@ -12,23 +11,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

func SyncStageString(v api.SyncStateStage) string {
switch v {
case api.StageHeaders:
return "header sync"
case api.StagePersistHeaders:
return "persisting headers"
case api.StageMessages:
return "message sync"
case api.StageSyncComplete:
return "complete"
case api.StageSyncErrored:
return "error"
default:
return fmt.Sprintf("<unknown: %d>", v)
}
}

type SyncerState struct {
lk sync.Mutex
Target *types.TipSet
Expand Down
49 changes: 45 additions & 4 deletions cli/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
"strings"
"text/tabwriter"

"github.com/dustin/go-humanize"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/libp2p/go-libp2p-core/peer"
protocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/multiformats/go-multiaddr"

"github.com/dustin/go-humanize"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/go-address"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/addrutil"
)

Expand Down Expand Up @@ -141,7 +146,7 @@ var NetListen = &cli.Command{
var netConnect = &cli.Command{
Name: "connect",
Usage: "Connect to a peer",
ArgsUsage: "[peerMultiaddr]",
ArgsUsage: "[peerMultiaddr|minerActorAddress]",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
Expand All @@ -152,7 +157,43 @@ var netConnect = &cli.Command{

pis, err := addrutil.ParseAddresses(ctx, cctx.Args().Slice())
if err != nil {
return err
a, perr := address.NewFromString(cctx.Args().First())
if perr != nil {
return err
}

na, fc, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer fc()

mi, err := na.StateMinerInfo(ctx, a, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner info: %w", err)
}

if mi.PeerId == nil {
return xerrors.Errorf("no PeerID for miner")
}
multiaddrs := make([]multiaddr.Multiaddr, 0, len(mi.Multiaddrs))
for i, a := range mi.Multiaddrs {
maddr, err := multiaddr.NewMultiaddrBytes(a)
if err != nil {
log.Warnf("parsing multiaddr %d (%x): %s", i, a, err)
continue
}
multiaddrs = append(multiaddrs, maddr)
}

pi := peer.AddrInfo{
ID: *mi.PeerId,
Addrs: multiaddrs,
}

fmt.Printf("%s -> %s\n", a, pi)

pis = append(pis, pi)
}

for _, pi := range pis {
Expand Down
5 changes: 2 additions & 3 deletions cli/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
)

var syncCmd = &cli.Command{
Expand Down Expand Up @@ -61,7 +60,7 @@ var syncStatusCmd = &cli.Command{
fmt.Printf("\tBase:\t%s\n", base)
fmt.Printf("\tTarget:\t%s (%d)\n", target, theight)
fmt.Printf("\tHeight diff:\t%d\n", heightDiff)
fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage))
fmt.Printf("\tStage: %s\n", ss.Stage)
fmt.Printf("\tHeight: %d\n", ss.Height)
if ss.End.IsZero() {
if !ss.Start.IsZero() {
Expand Down Expand Up @@ -186,7 +185,7 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
theight = ss.Target.Height()
}

fmt.Printf("\r\x1b[2KWorker %d: Target Height: %d\tTarget: %s\tState: %s\tHeight: %d", working, theight, target, chain.SyncStageString(ss.Stage), ss.Height)
fmt.Printf("\r\x1b[2KWorker %d: Target Height: %d\tTarget: %s\tState: %s\tHeight: %d", working, theight, target, ss.Stage, ss.Height)

if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) {
fmt.Println("\nDone!")
Expand Down
Loading

0 comments on commit cb3b0ab

Please sign in to comment.