Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce the ability to mock time. #2362

Merged
merged 6 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions api/test/window_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"

"os"
"strings"
"testing"
Expand Down Expand Up @@ -35,14 +37,14 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
build.Clock.Sleep(time.Second)

mine := true
done := make(chan struct{})
go func() {
defer close(done)
for mine {
time.Sleep(blocktime)
build.Clock.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err)
}
Expand All @@ -69,7 +71,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i
break
}

time.Sleep(100 * time.Millisecond)
build.Clock.Sleep(100 * time.Millisecond)
}

fmt.Printf("All sectors is fsm\n")
Expand All @@ -94,7 +96,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i
}
}

time.Sleep(100 * time.Millisecond)
build.Clock.Sleep(100 * time.Millisecond)
fmt.Printf("WaitSeal: %d\n", len(s))
}
}
Expand All @@ -115,14 +117,14 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
build.Clock.Sleep(time.Second)

mine := true
done := make(chan struct{})
go func() {
defer close(done)
for mine {
time.Sleep(blocktime)
build.Clock.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -150,7 +152,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
if head.Height()%100 == 0 {
fmt.Printf("@%d\n", head.Height())
}
time.Sleep(blocktime)
build.Clock.Sleep(blocktime)
}

p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)
Expand Down
10 changes: 10 additions & 0 deletions build/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package build

import "github.com/raulk/clock"

// Clock is the global clock for the system. In standard builds,
// we use a real-time clock, which maps to the `time` package.
//
// Tests that need control of time can replace this variable with
// clock.NewMock().
var Clock = clock.New()
9 changes: 5 additions & 4 deletions chain/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package beacon

import (
"context"
"time"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)

var log = logging.Logger("beacon")
Expand Down Expand Up @@ -52,7 +53,7 @@ func ValidateBlockValues(b RandomBeacon, h *types.BlockHeader, prevEntry types.B
}

func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.ChainEpoch, prev types.BeaconEntry) ([]types.BeaconEntry, error) {
start := time.Now()
start := build.Clock.Now()

maxRound := beacon.MaxBeaconRoundForEpoch(round, prev)
if maxRound == prev.Round {
Expand Down Expand Up @@ -81,7 +82,7 @@ func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.C
}
}

log.Debugw("fetching beacon entries", "took", time.Since(start), "numEntries", len(out))
log.Debugw("fetching beacon entries", "took", build.Clock.Since(start), "numEntries", len(out))
reverse(out)
return out, nil
}
Expand Down
5 changes: 3 additions & 2 deletions chain/beacon/drand/drand.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/filecoin-project/specs-actors/actors/abi"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
Expand Down Expand Up @@ -131,7 +132,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
}

go func() {
start := time.Now()
start := build.Clock.Now()
log.Infow("start fetching randomness", "round", round)
resp, err := db.client.Get(ctx, round)

Expand All @@ -142,7 +143,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
br.Entry.Round = resp.Round()
br.Entry.Data = resp.Signature()
}
log.Infow("done fetching randomness", "round", round, "took", time.Since(start))
log.Infow("done fetching randomness", "round", round, "took", build.Clock.Since(start))
out <- br
close(out)
}()
Expand Down
5 changes: 3 additions & 2 deletions chain/block_receipt_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/hashicorp/golang-lru"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -37,14 +38,14 @@ func (brt *blockReceiptTracker) Add(p peer.ID, ts *types.TipSet) {
if !ok {
pset := &peerSet{
peers: map[peer.ID]time.Time{
p: time.Now(),
p: build.Clock.Now(),
},
}
brt.cache.Add(ts.Key(), pset)
return
}

val.(*peerSet).peers[p] = time.Now()
val.(*peerSet).peers[p] = build.Clock.Now()
}

func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID {
Expand Down
3 changes: 2 additions & 1 deletion chain/blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

cborutil "github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"

Expand Down Expand Up @@ -126,7 +127,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
}

writeDeadline := 60 * time.Second
_ = s.SetDeadline(time.Now().Add(writeDeadline))
_ = s.SetDeadline(build.Clock.Now().Add(writeDeadline))
raulk marked this conversation as resolved.
Show resolved Hide resolved
if err := cborutil.WriteCborRPC(s, resp); err != nil {
log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer())
return
Expand Down
21 changes: 11 additions & 10 deletions chain/blocksync/blocksync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"golang.org/x/xerrors"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
incrt "github.com/filecoin-project/lotus/lib/increadtimeout"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
// randomize the first few peers so we don't always pick the same peer
shufflePrefix(peers)

start := time.Now()
start := build.Clock.Now()
var oerr error

for _, p := range peers {
Expand All @@ -117,7 +118,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
if err != nil {
return nil, xerrors.Errorf("success response from peer failed to process: %w", err)
}
bs.syncPeers.logGlobalSuccess(time.Since(start))
bs.syncPeers.logGlobalSuccess(build.Clock.Since(start))
bs.host.ConnManager().TagPeer(p, "bsync", 25)
return resp, nil
}
Expand Down Expand Up @@ -197,7 +198,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
}

var err error
start := time.Now()
start := build.Clock.Now()

for _, p := range peers {
res, rerr := bs.sendRequestToPeer(ctx, p, req)
Expand All @@ -208,7 +209,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
}

if res.Status == StatusOK {
bs.syncPeers.logGlobalSuccess(time.Since(start))
bs.syncPeers.logGlobalSuccess(build.Clock.Since(start))
return res.Chain, nil
}

Expand Down Expand Up @@ -284,25 +285,25 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
ctx, span := trace.StartSpan(ctx, "blockSyncFetch")
defer span.End()

start := time.Now()
start := build.Clock.Now()
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
if err != nil {
bs.RemovePeer(p)
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
}
_ = s.SetWriteDeadline(time.Now().Add(5 * time.Second))
_ = s.SetWriteDeadline(build.Clock.Now().Add(5 * time.Second))
raulk marked this conversation as resolved.
Show resolved Hide resolved

if err := cborutil.WriteCborRPC(s, req); err != nil {
_ = s.SetWriteDeadline(time.Time{})
bs.syncPeers.logFailure(p, time.Since(start))
bs.syncPeers.logFailure(p, build.Clock.Since(start))
return nil, err
}
_ = s.SetWriteDeadline(time.Time{})

var res BlockSyncResponse
r := incrt.New(s, 50<<10, 5*time.Second)
if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil {
bs.syncPeers.logFailure(p, time.Since(start))
bs.syncPeers.logFailure(p, build.Clock.Since(start))
return nil, err
}

Expand All @@ -314,7 +315,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
)
}

bs.syncPeers.logSuccess(p, time.Since(start))
bs.syncPeers.logSuccess(p, build.Clock.Since(start))
return &res, nil
}

Expand Down Expand Up @@ -475,7 +476,7 @@ func (bpt *bsPeerTracker) addPeer(p peer.ID) {
return
}
bpt.peers[p] = &peerStats{
firstSeen: time.Now(),
firstSeen: build.Clock.Now(),
}

}
Expand Down
2 changes: 1 addition & 1 deletion chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
time.Sleep(time.Second)
build.Clock.Sleep(time.Second)
log.Info("restarting listenHeadChanges")
}
}
Expand Down
2 changes: 1 addition & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
*genm2,
},
NetworkName: "",
Timestamp: uint64(time.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
}

genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl)
Expand Down
7 changes: 5 additions & 2 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/modules/dtypes"

"github.com/raulk/clock"
)

var log = logging.Logger("messagepool")
Expand Down Expand Up @@ -66,7 +69,7 @@ type MessagePool struct {
lk sync.Mutex

closer chan struct{}
repubTk *time.Ticker
repubTk *clock.Ticker

localAddrs map[address.Address]struct{}

Expand Down Expand Up @@ -187,7 +190,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa

mp := &MessagePool{
closer: make(chan struct{}),
repubTk: time.NewTicker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0),
Expand Down
6 changes: 3 additions & 3 deletions chain/metrics/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package metrics
import (
"context"
"encoding/json"
"time"

"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
Expand Down Expand Up @@ -89,7 +89,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain
}

// using unix nano time makes very sure we pick a nonce higher than previous restart
nonce := uint64(time.Now().UnixNano())
nonce := uint64(build.Clock.Now().UnixNano())

for {
select {
Expand All @@ -107,7 +107,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain
Height: n.Val.Height(),
Weight: w,
NodeName: nickname,
Time: uint64(time.Now().UnixNano() / 1000_000),
Time: uint64(build.Clock.Now().UnixNano() / 1000_000),
Nonce: nonce,
}

Expand Down
Loading