From 13de81b3b27926fd5db1e7334d540f8e6d72c774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 10 Jul 2020 15:43:14 +0100 Subject: [PATCH 1/3] introduce the ability to mock time. --- api/test/window_post.go | 16 ++++++----- build/clock.go | 10 +++++++ chain/beacon/beacon.go | 9 +++--- chain/beacon/drand/drand.go | 5 ++-- chain/block_receipt_tracker.go | 5 ++-- chain/blocksync/blocksync.go | 3 +- chain/blocksync/blocksync_client.go | 21 +++++++------- chain/events/events.go | 2 +- chain/gen/gen.go | 2 +- chain/messagepool/messagepool.go | 7 +++-- chain/metrics/consensus.go | 6 ++-- chain/sub/incoming.go | 12 ++++---- chain/sync.go | 8 +++--- chain/syncstate.go | 7 +++-- chain/vm/runtime.go | 5 ++-- chain/vm/vm.go | 5 ++-- cli/sync.go | 2 +- cmd/chain-noise/main.go | 3 +- go.mod | 1 + go.sum | 2 ++ journal/journal.go | 6 ++-- lib/increadtimeout/incrt.go | 8 +++--- lib/peermgr/peermgr.go | 3 +- miner/miner.go | 44 ++++++++++++++--------------- node/hello/hello.go | 13 +++++---- node/modules/testing/genesis.go | 4 +-- storage/miner.go | 4 +-- storage/wdpost_run.go | 2 +- storage/wdpost_sched.go | 3 +- tools/stats/metrics.go | 4 +-- tools/stats/rpc.go | 6 ++-- 31 files changed, 129 insertions(+), 99 deletions(-) create mode 100644 build/clock.go diff --git a/api/test/window_post.go b/api/test/window_post.go index dcf6fcebd64..874bcadcfa7 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "os" "strings" "testing" @@ -35,14 +37,14 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect if err := miner.NetConnect(ctx, addrinfo); err != nil { t.Fatal(err) } - time.Sleep(time.Second) + build.Clock.Sleep(time.Second) mine := true done := make(chan struct{}) go func() { defer close(done) for mine { - time.Sleep(blocktime) + build.Clock.Sleep(blocktime) if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil { t.Error(err) } @@ -69,7 +71,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i break } - time.Sleep(100 * time.Millisecond) + build.Clock.Sleep(100 * time.Millisecond) } fmt.Printf("All sectors is fsm\n") @@ -94,7 +96,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i } } - time.Sleep(100 * time.Millisecond) + build.Clock.Sleep(100 * time.Millisecond) fmt.Printf("WaitSeal: %d\n", len(s)) } } @@ -115,14 +117,14 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector if err := miner.NetConnect(ctx, addrinfo); err != nil { t.Fatal(err) } - time.Sleep(time.Second) + build.Clock.Sleep(time.Second) mine := true done := make(chan struct{}) go func() { defer close(done) for mine { - time.Sleep(blocktime) + build.Clock.Sleep(blocktime) if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil { t.Error(err) } @@ -150,7 +152,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector if head.Height()%100 == 0 { fmt.Printf("@%d\n", head.Height()) } - time.Sleep(blocktime) + build.Clock.Sleep(blocktime) } p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK) diff --git a/build/clock.go b/build/clock.go new file mode 100644 index 00000000000..5b172672053 --- /dev/null +++ b/build/clock.go @@ -0,0 +1,10 @@ +package build + +import "github.com/raulk/clock" + +// Clock is the global clock for the system. In standard builds, +// we use a real-time clock, which maps to the `time` package. +// +// Tests that need control of time can replace this variable with +// clock.NewMock(). +var Clock = clock.New() diff --git a/chain/beacon/beacon.go b/chain/beacon/beacon.go index 2be2e7f1ce2..7b998c04f05 100644 --- a/chain/beacon/beacon.go +++ b/chain/beacon/beacon.go @@ -2,12 +2,13 @@ package beacon import ( "context" - "time" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/specs-actors/actors/abi" logging "github.com/ipfs/go-log" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" ) var log = logging.Logger("beacon") @@ -52,7 +53,7 @@ func ValidateBlockValues(b RandomBeacon, h *types.BlockHeader, prevEntry types.B } func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.ChainEpoch, prev types.BeaconEntry) ([]types.BeaconEntry, error) { - start := time.Now() + start := build.Clock.Now() maxRound := beacon.MaxBeaconRoundForEpoch(round, prev) if maxRound == prev.Round { @@ -81,7 +82,7 @@ func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.C } } - log.Debugw("fetching beacon entries", "took", time.Since(start), "numEntries", len(out)) + log.Debugw("fetching beacon entries", "took", build.Clock.Since(start), "numEntries", len(out)) reverse(out) return out, nil } diff --git a/chain/beacon/drand/drand.go b/chain/beacon/drand/drand.go index 00ff05f8102..1b036bbf0ef 100644 --- a/chain/beacon/drand/drand.go +++ b/chain/beacon/drand/drand.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -131,7 +132,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re } go func() { - start := time.Now() + start := build.Clock.Now() log.Infow("start fetching randomness", "round", round) resp, err := db.client.Get(ctx, round) @@ -142,7 +143,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re br.Entry.Round = resp.Round() br.Entry.Data = resp.Signature() } - log.Infow("done fetching randomness", "round", round, "took", time.Since(start)) + log.Infow("done fetching randomness", "round", round, "took", build.Clock.Since(start)) out <- br close(out) }() diff --git a/chain/block_receipt_tracker.go b/chain/block_receipt_tracker.go index f182fd18069..466adef9de9 100644 --- a/chain/block_receipt_tracker.go +++ b/chain/block_receipt_tracker.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" "github.com/hashicorp/golang-lru" peer "github.com/libp2p/go-libp2p-core/peer" @@ -37,14 +38,14 @@ func (brt *blockReceiptTracker) Add(p peer.ID, ts *types.TipSet) { if !ok { pset := &peerSet{ peers: map[peer.ID]time.Time{ - p: time.Now(), + p: build.Clock.Now(), }, } brt.cache.Add(ts.Key(), pset) return } - val.(*peerSet).peers[p] = time.Now() + val.(*peerSet).peers[p] = build.Clock.Now() } func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID { diff --git a/chain/blocksync/blocksync.go b/chain/blocksync/blocksync.go index a9251c41927..53b9feeff15 100644 --- a/chain/blocksync/blocksync.go +++ b/chain/blocksync/blocksync.go @@ -11,6 +11,7 @@ import ( cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -126,7 +127,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) { } writeDeadline := 60 * time.Second - _ = s.SetDeadline(time.Now().Add(writeDeadline)) + _ = s.SetDeadline(build.Clock.Now().Add(writeDeadline)) if err := cborutil.WriteCborRPC(s, resp); err != nil { log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer()) return diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index daa4b6335c7..ac8bcc7a18c 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -21,6 +21,7 @@ import ( "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" incrt "github.com/filecoin-project/lotus/lib/increadtimeout" @@ -91,7 +92,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i // randomize the first few peers so we don't always pick the same peer shufflePrefix(peers) - start := time.Now() + start := build.Clock.Now() var oerr error for _, p := range peers { @@ -117,7 +118,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i if err != nil { return nil, xerrors.Errorf("success response from peer failed to process: %w", err) } - bs.syncPeers.logGlobalSuccess(time.Since(start)) + bs.syncPeers.logGlobalSuccess(build.Clock.Since(start)) bs.host.ConnManager().TagPeer(p, "bsync", 25) return resp, nil } @@ -197,7 +198,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun } var err error - start := time.Now() + start := build.Clock.Now() for _, p := range peers { res, rerr := bs.sendRequestToPeer(ctx, p, req) @@ -208,7 +209,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun } if res.Status == StatusOK { - bs.syncPeers.logGlobalSuccess(time.Since(start)) + bs.syncPeers.logGlobalSuccess(build.Clock.Since(start)) return res.Chain, nil } @@ -284,17 +285,17 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B ctx, span := trace.StartSpan(ctx, "blockSyncFetch") defer span.End() - start := time.Now() + start := build.Clock.Now() s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID) if err != nil { bs.RemovePeer(p) return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } - _ = s.SetWriteDeadline(time.Now().Add(5 * time.Second)) + _ = s.SetWriteDeadline(build.Clock.Now().Add(5 * time.Second)) if err := cborutil.WriteCborRPC(s, req); err != nil { _ = s.SetWriteDeadline(time.Time{}) - bs.syncPeers.logFailure(p, time.Since(start)) + bs.syncPeers.logFailure(p, build.Clock.Since(start)) return nil, err } _ = s.SetWriteDeadline(time.Time{}) @@ -302,7 +303,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B var res BlockSyncResponse r := incrt.New(s, 50<<10, 5*time.Second) if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil { - bs.syncPeers.logFailure(p, time.Since(start)) + bs.syncPeers.logFailure(p, build.Clock.Since(start)) return nil, err } @@ -314,7 +315,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B ) } - bs.syncPeers.logSuccess(p, time.Since(start)) + bs.syncPeers.logSuccess(p, build.Clock.Since(start)) return &res, nil } @@ -475,7 +476,7 @@ func (bpt *bsPeerTracker) addPeer(p peer.ID) { return } bpt.peers[p] = &peerStats{ - firstSeen: time.Now(), + firstSeen: build.Clock.Now(), } } diff --git a/chain/events/events.go b/chain/events/events.go index e1150779594..4550fc98a9b 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -99,7 +99,7 @@ func (e *Events) listenHeadChanges(ctx context.Context) { log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err()) return } - time.Sleep(time.Second) + build.Clock.Sleep(time.Second) log.Info("restarting listenHeadChanges") } } diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 31d352ed481..3a0ac2187e2 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -196,7 +196,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { *genm2, }, NetworkName: "", - Timestamp: uint64(time.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()), + Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()), } genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index b8ac55c5907..2939dba13cd 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -23,12 +23,15 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/node/modules/dtypes" + + "github.com/raulk/clock" ) var log = logging.Logger("messagepool") @@ -66,7 +69,7 @@ type MessagePool struct { lk sync.Mutex closer chan struct{} - repubTk *time.Ticker + repubTk *clock.Ticker localAddrs map[address.Address]struct{} @@ -187,7 +190,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa mp := &MessagePool{ closer: make(chan struct{}), - repubTk: time.NewTicker(time.Duration(build.BlockDelaySecs) * 10 * time.Second), + repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second), localAddrs: make(map[address.Address]struct{}), pending: make(map[address.Address]*msgSet), minGasPrice: types.NewInt(0), diff --git a/chain/metrics/consensus.go b/chain/metrics/consensus.go index bc7e019d219..7d19d5bd6ae 100644 --- a/chain/metrics/consensus.go +++ b/chain/metrics/consensus.go @@ -3,7 +3,6 @@ package metrics import ( "context" "encoding/json" - "time" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-cid" @@ -11,6 +10,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -89,7 +89,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain } // using unix nano time makes very sure we pick a nonce higher than previous restart - nonce := uint64(time.Now().UnixNano()) + nonce := uint64(build.Clock.Now().UnixNano()) for { select { @@ -107,7 +107,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain Height: n.Val.Height(), Weight: w, NodeName: nickname, - Time: uint64(time.Now().UnixNano() / 1000_000), + Time: uint64(build.Clock.Now().UnixNano() / 1000_000), Nonce: nonce, } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 9dfc2d89455..0db1325625f 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -61,7 +61,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha src := msg.GetFrom() go func() { - start := time.Now() + start := build.Clock.Now() log.Debug("about to fetch messages for block from pubsub") bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages) if err != nil { @@ -75,9 +75,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha return } - took := time.Since(start) + took := build.Clock.Since(start) log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took) - if delay := time.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 { + if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 { log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner) } @@ -142,9 +142,9 @@ func (bv *BlockValidator) flagPeer(p peer.ID) { func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { // track validation time - begin := time.Now() + begin := build.Clock.Now() defer func() { - log.Debugf("block validation time: %s", time.Since(begin)) + log.Debugf("block validation time: %s", build.Clock.Since(begin)) }() stats.Record(ctx, metrics.BlockReceived.M(1)) @@ -233,7 +233,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub func (bv *BlockValidator) isChainNearSynced() bool { ts := bv.chain.GetHeaviestTipSet() timestamp := ts.MinTimestamp() - now := time.Now().UnixNano() + now := build.Clock.Now().UnixNano() cutoff := uint64(now) - uint64(6*time.Hour) return timestamp > cutoff } diff --git a/chain/sync.go b/chain/sync.go index bcfa71267f2..df067f1e9ed 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -620,7 +620,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) (er return nil } - validationStart := time.Now() + validationStart := build.Clock.Now() defer func() { dur := time.Since(validationStart) durMilli := dur.Seconds() * float64(1000) @@ -665,12 +665,12 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) (er // fast checks first - now := uint64(time.Now().Unix()) + now := uint64(build.Clock.Now().Unix()) if h.Timestamp > now+build.AllowableClockDriftSecs { return xerrors.Errorf("block was from the future (now=%d, blk=%d): %w", now, h.Timestamp, ErrTemporal) } if h.Timestamp > now { - log.Warn("Got block from the future, but within threshold", h.Timestamp, time.Now().Unix()) + log.Warn("Got block from the future, but within threshold", h.Timestamp, build.Clock.Now().Unix()) } if h.Timestamp < baseTs.MinTimestamp()+(build.BlockDelaySecs*uint64(h.Height-baseTs.Height())) { @@ -1538,6 +1538,6 @@ func (syncer *Syncer) IsEpochBeyondCurrMax(epoch abi.ChainEpoch) bool { return false } - now := uint64(time.Now().Unix()) + now := uint64(build.Clock.Now().Unix()) return epoch > (abi.ChainEpoch((now-g.Timestamp)/build.BlockDelaySecs) + MaxHeightDrift) } diff --git a/chain/syncstate.go b/chain/syncstate.go index b213c748314..aaca8830314 100644 --- a/chain/syncstate.go +++ b/chain/syncstate.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" ) @@ -48,7 +49,7 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) { defer ss.lk.Unlock() ss.Stage = v if v == api.StageSyncComplete { - ss.End = time.Now() + ss.End = build.Clock.Now() } } @@ -64,7 +65,7 @@ func (ss *SyncerState) Init(base, target *types.TipSet) { ss.Stage = api.StageHeaders ss.Height = 0 ss.Message = "" - ss.Start = time.Now() + ss.Start = build.Clock.Now() ss.End = time.Time{} } @@ -87,7 +88,7 @@ func (ss *SyncerState) Error(err error) { defer ss.lk.Unlock() ss.Message = err.Error() ss.Stage = api.StageSyncErrored - ss.End = time.Now() + ss.End = build.Clock.Now() } func (ss *SyncerState) Snapshot() SyncerState { diff --git a/chain/vm/runtime.go b/chain/vm/runtime.go index 67b1c9d9d77..a160e8455a5 100644 --- a/chain/vm/runtime.go +++ b/chain/vm/runtime.go @@ -373,8 +373,7 @@ func (rt *Runtime) Send(to address.Address, method abi.MethodNum, m vmr.CBORMars } func (rt *Runtime) internalSend(from, to address.Address, method abi.MethodNum, value types.BigInt, params []byte) ([]byte, aerrors.ActorError) { - - start := time.Now() + start := build.Clock.Now() ctx, span := trace.StartSpan(rt.ctx, "vmc.Send") defer span.End() if span.IsRecordingEvents() { @@ -528,7 +527,7 @@ func (rt *Runtime) chargeGasInternal(gas GasCharge, skip int) aerrors.ActorError var callers [10]uintptr cout := gruntime.Callers(2+skip, callers[:]) - now := time.Now() + now := build.Clock.Now() if rt.lastGasCharge != nil { rt.lastGasCharge.TimeTaken = now.Sub(rt.lastGasChargeTime) } diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 2b22fbf1003..b46a9236bda 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/runtime" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/aerrors" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/types" @@ -285,7 +286,7 @@ func checkMessage(msg *types.Message) error { } func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { - start := time.Now() + start := build.Clock.Now() ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start) rt.finilizeGasTracing() return &ApplyRet{ @@ -302,7 +303,7 @@ func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*Ap } func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { - start := time.Now() + start := build.Clock.Now() ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage") defer span.End() msg := cmsg.VMMessage() diff --git a/cli/sync.go b/cli/sync.go index fbb69a87033..57553a06800 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -195,7 +195,7 @@ func SyncWait(ctx context.Context, napi api.FullNode) error { case <-ctx.Done(): fmt.Println("\nExit by user") return nil - case <-time.After(1 * time.Second): + case <-build.Clock.After(1 * time.Second): } } } diff --git a/cmd/chain-noise/main.go b/cmd/chain-noise/main.go index a1e92ee9625..4a4a099d8a8 100644 --- a/cmd/chain-noise/main.go +++ b/cmd/chain-noise/main.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" @@ -68,7 +69,7 @@ func sendSmallFundsTxs(ctx context.Context, api api.FullNode, from address.Addre sendSet = append(sendSet, naddr) } - tick := time.NewTicker(time.Second / time.Duration(rate)) + tick := build.Clock.Ticker(time.Second / time.Duration(rate)) for { select { case <-tick.C: diff --git a/go.mod b/go.mod index aca92c57f23..8a907eb6a81 100644 --- a/go.mod +++ b/go.mod @@ -103,6 +103,7 @@ require ( github.com/multiformats/go-multibase v0.0.3 github.com/multiformats/go-multihash v0.0.13 github.com/opentracing/opentracing-go v1.1.0 + github.com/raulk/clock v1.1.0 github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.6.1 github.com/syndtr/goleveldb v1.0.0 diff --git a/go.sum b/go.sum index e7e68404e9c..854069815b7 100644 --- a/go.sum +++ b/go.sum @@ -1221,6 +1221,8 @@ github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.0 h1:jhMy6QXfi3y2HEzFoyuCj40z4OZIIHHPtFyCMftmvKA= github.com/prometheus/procfs v0.1.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y= +github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/journal/journal.go b/journal/journal.go index b664e8fa755..8d509d51ccd 100644 --- a/journal/journal.go +++ b/journal/journal.go @@ -10,6 +10,8 @@ import ( logging "github.com/ipfs/go-log" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" ) func InitializeSystemJournal(dir string) error { @@ -103,7 +105,7 @@ func (fsj *fsJournal) rollJournalFile() error { fsj.fi.Close() } - nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339)))) + nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339)))) if err != nil { return xerrors.Errorf("failed to open journal file: %w", err) } @@ -130,7 +132,7 @@ func (fsj *fsJournal) runLoop() { func (fsj *fsJournal) AddEntry(system string, obj interface{}) { je := &JournalEntry{ System: system, - Timestamp: time.Now(), + Timestamp: build.Clock.Now(), Val: obj, } select { diff --git a/lib/increadtimeout/incrt.go b/lib/increadtimeout/incrt.go index 0b9c65d4df6..dd5a05ff80c 100644 --- a/lib/increadtimeout/incrt.go +++ b/lib/increadtimeout/incrt.go @@ -5,12 +5,12 @@ import ( "time" logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/build" ) var log = logging.Logger("incrt") -var now = time.Now - type ReaderDeadline interface { Read([]byte) (int, error) SetReadDeadline(time.Time) error @@ -45,7 +45,7 @@ func (err errNoWait) Timeout() bool { } func (crt *incrt) Read(buf []byte) (int, error) { - start := now() + start := build.Clock.Now() if crt.wait == 0 { return 0, errNoWait{} } @@ -59,7 +59,7 @@ func (crt *incrt) Read(buf []byte) (int, error) { _ = crt.rd.SetReadDeadline(time.Time{}) if err == nil { - dur := now().Sub(start) + dur := build.Clock.Now().Sub(start) crt.wait -= dur crt.wait += time.Duration(n) * crt.waitPerByte if crt.wait < 0 { diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index 490f2b22948..d275be3d53d 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/modules/dtypes" "go.opencensus.io/stats" @@ -107,7 +108,7 @@ func (pmgr *PeerMgr) Disconnect(p peer.ID) { } func (pmgr *PeerMgr) Run(ctx context.Context) { - tick := time.NewTicker(time.Second * 5) + tick := build.Clock.Ticker(time.Second * 5) for { select { case <-tick.C: diff --git a/miner/miner.go b/miner/miner.go index d81216be25a..8c46be67500 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -46,7 +46,7 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address) waitFunc: func(ctx context.Context, baseTime uint64) (func(bool, error), error) { // Wait around for half the block time in case other parents come in deadline := baseTime + build.PropagationDelaySecs - time.Sleep(time.Until(time.Unix(int64(deadline), 0))) + build.Clock.Sleep(build.Clock.Until(time.Unix(int64(deadline), 0))) return func(bool, error) {}, nil }, @@ -107,7 +107,7 @@ func (m *Miner) Stop(ctx context.Context) error { func (m *Miner) niceSleep(d time.Duration) bool { select { - case <-time.After(d): + case <-build.Clock.After(d): return true case <-m.stop: return false @@ -170,14 +170,14 @@ func (m *Miner) mine(ctx context.Context) { if b != nil { btime := time.Unix(int64(b.Header.Timestamp), 0) - if time.Now().Before(btime) { - if !m.niceSleep(time.Until(btime)) { + if build.Clock.Now().Before(btime) { + if !m.niceSleep(build.Clock.Until(btime)) { log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out") - time.Sleep(time.Until(btime)) + build.Clock.Sleep(build.Clock.Until(btime)) } } else { log.Warnw("mined block in the past", "block-time", btime, - "time", time.Now(), "duration", time.Since(btime)) + "time", build.Clock.Now(), "duration", build.Clock.Since(btime)) } // TODO: should do better 'anti slash' protection here @@ -201,7 +201,7 @@ func (m *Miner) mine(ctx context.Context) { nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0) select { - case <-time.After(time.Until(nextRound)): + case <-build.Clock.After(build.Clock.Until(nextRound)): case <-m.stop: stopping := m.stopping m.stop = nil @@ -271,7 +271,7 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti // 1. func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) { log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids())) - start := time.Now() + start := build.Clock.Now() round := base.TipSet.Height() + base.NullRounds + 1 @@ -283,11 +283,11 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, nil } - tMBI := time.Now() + tMBI := build.Clock.Now() beaconPrev := mbi.PrevBeaconEntry - tDrand := time.Now() + tDrand := build.Clock.Now() bvals := mbi.BeaconEntries hasPower, err := m.hasPower(ctx, m.address, base.TipSet) @@ -299,9 +299,9 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, nil } - tPowercheck := time.Now() + tPowercheck := build.Clock.Now() - log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds) + log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(build.Clock.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds) rbase := beaconPrev if len(bvals) > 0 { @@ -322,7 +322,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, nil } - tTicket := time.Now() + tTicket := build.Clock.Now() buf := new(bytes.Buffer) if err := m.address.MarshalCBOR(buf); err != nil { @@ -336,7 +336,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, prand := abi.PoStRandomness(rand) - tSeed := time.Now() + tSeed := build.Clock.Now() postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand) if err != nil { @@ -349,7 +349,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, xerrors.Errorf("failed to get pending messages: %w", err) } - tPending := time.Now() + tPending := build.Clock.Now() // TODO: winning post proof b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending) @@ -357,7 +357,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, xerrors.Errorf("failed to create block: %w", err) } - tCreateBlock := time.Now() + tCreateBlock := build.Clock.Now() dur := tCreateBlock.Sub(start) log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur) if dur > time.Second*time.Duration(build.BlockDelaySecs) { @@ -502,7 +502,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs tooLowFundMsgs := 0 tooHighNonceMsgs := 0 - start := time.Now() + start := build.Clock.Now() vmValid := time.Duration(0) getbal := time.Duration(0) @@ -511,7 +511,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs }) for _, msg := range msgs { - vmstart := time.Now() + vmstart := build.Clock.Now() minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { @@ -519,7 +519,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs continue } - vmValid += time.Since(vmstart) + vmValid += build.Clock.Since(vmstart) // TODO: this should be in some more general 'validate message' call if msg.Message.GasLimit > build.BlockGasLimit { @@ -534,7 +534,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs from := msg.Message.From - getBalStart := time.Now() + getBalStart := build.Clock.Now() if _, ok := inclNonces[from]; !ok { act, err := al(ctx, from, ts.Key()) if err != nil { @@ -545,7 +545,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs inclNonces[from] = act.Nonce inclBalances[from] = act.Balance } - getbal += time.Since(getBalStart) + getbal += build.Clock.Since(getBalStart) if inclBalances[from].LessThan(msg.Message.RequiredFunds()) { tooLowFundMsgs++ @@ -648,7 +648,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs) } - sm := time.Now() + sm := build.Clock.Now() if sm.Sub(start) > time.Second { log.Warnw("SelectMessages took a long time", "duration", sm.Sub(start), diff --git a/node/hello/hello.go b/node/hello/hello.go index a2a5fbd4e65..5f35faafc57 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -15,6 +15,7 @@ import ( protocol "github.com/libp2p/go-libp2p-core/protocol" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -67,7 +68,7 @@ func (hs *Service) HandleStream(s inet.Stream) { _ = s.Conn().Close() return } - arrived := time.Now() + arrived := build.Clock.Now() log.Debugw("genesis from hello", "tipset", hmsg.HeaviestTipSet, @@ -82,7 +83,7 @@ func (hs *Service) HandleStream(s inet.Stream) { go func() { defer s.Close() //nolint:errcheck - sent := time.Now() + sent := build.Clock.Now() msg := &LatencyMessage{ TArrial: arrived.UnixNano(), TSent: sent.UnixNano(), @@ -99,7 +100,7 @@ func (hs *Service) HandleStream(s inet.Stream) { if len(protos) == 0 { log.Warn("other peer hasnt completed libp2p identify, waiting a bit") // TODO: this better - time.Sleep(time.Millisecond * 300) + build.Clock.Sleep(time.Millisecond * 300) } ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), types.NewTipSetKey(hmsg.HeaviestTipSet...)) @@ -146,7 +147,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { } log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid()) - t0 := time.Now() + t0 := build.Clock.Now() if err := cborutil.WriteCborRPC(s, hmsg); err != nil { return err } @@ -155,13 +156,13 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { defer s.Close() //nolint:errcheck lmsg := &LatencyMessage{} - _ = s.SetReadDeadline(time.Now().Add(10 * time.Second)) + _ = s.SetReadDeadline(build.Clock.Now().Add(10 * time.Second)) err := cborutil.ReadCborRPC(s, lmsg) if err != nil { log.Infow("reading latency message", "error", err) } - t3 := time.Now() + t3 := build.Clock.Now() lat := t3.Sub(t0) // add to peer tracker if hs.pmgr != nil { diff --git a/node/modules/testing/genesis.go b/node/modules/testing/genesis.go index 930c11fc8ea..41587ed722d 100644 --- a/node/modules/testing/genesis.go +++ b/node/modules/testing/genesis.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "os" - "time" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -20,6 +19,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/runtime" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/gen" genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis" "github.com/filecoin-project/lotus/chain/types" @@ -71,7 +71,7 @@ func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore } if template.Timestamp == 0 { - template.Timestamp = uint64(time.Now().Unix()) + template.Timestamp = uint64(build.Clock.Now().Unix()) } b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template) diff --git a/storage/miner.go b/storage/miner.go index a0e2c9225ec..42a4369883e 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -167,7 +167,7 @@ func NewWinningPoStProver(api api.FullNode, prover storage.Prover, verifier ffiw var _ gen.WinningPoStProver = (*StorageWpp)(nil) func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) { - start := time.Now() + start := build.Clock.Now() cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount) if err != nil { @@ -185,7 +185,7 @@ func (wpp *StorageWpp) ComputeProof(ctx context.Context, ssi []abi.SectorInfo, r log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand) - start := time.Now() + start := build.Clock.Now() proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand) if err != nil { return nil, err diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index f6ec6458362..9f3919240c3 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -430,7 +430,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo snums = append(snums, si.SectorNumber) } - tsStart := time.Now() + tsStart := build.Clock.Now() log.Infow("generating windowPost", "sectors", len(ssi)) diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 077209a4eb6..8e3221ef978 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -85,7 +86,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { if err != nil { log.Errorf("ChainNotify error: %+v") - time.Sleep(10 * time.Second) + build.Clock.Sleep(10 * time.Second) continue } diff --git a/tools/stats/metrics.go b/tools/stats/metrics.go index 62636373147..3abb4b13b7f 100644 --- a/tools/stats/metrics.go +++ b/tools/stats/metrics.go @@ -66,7 +66,7 @@ func NewInfluxWriteQueue(ctx context.Context, influx client.Client) *InfluxWrite for i := 0; i < maxRetries; i++ { if err := influx.Write(batch); err != nil { log.Warnw("Failed to write batch", "error", err) - time.Sleep(time.Second * 15) + build.Clock.Sleep(15 * time.Second) continue } @@ -104,7 +104,7 @@ func InfluxNewBatch() (client.BatchPoints, error) { } func NewPoint(name string, value interface{}) models.Point { - pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, time.Now()) + pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, build.Clock.Now()) return pt } diff --git a/tools/stats/rpc.go b/tools/stats/rpc.go index d053ff5614c..6b6cef283f2 100644 --- a/tools/stats/rpc.go +++ b/tools/stats/rpc.go @@ -52,7 +52,7 @@ sync_complete: select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-build.Clock.After(5 * time.Second): state, err := napi.SyncState(ctx) if err != nil { return err @@ -97,13 +97,13 @@ sync_complete: select { case <-ctx.Done(): return ctx.Err() - case <-time.After(5 * time.Second): + case <-build.Clock.After(5 * time.Second): head, err := napi.ChainHead(ctx) if err != nil { return err } - timestampDelta := time.Now().Unix() - int64(head.MinTimestamp()) + timestampDelta := build.Clock.Now().Unix() - int64(head.MinTimestamp()) log.Infow( "Waiting for reasonable head height", From 55d88440eb5cbbd31950bdf664b50b5456372ede Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 14 Jul 2020 17:12:00 +0100 Subject: [PATCH 2/3] refine block timing log statements. --- miner/miner.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/miner/miner.go b/miner/miner.go index 8c46be67500..a305f9b7418 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -170,14 +170,18 @@ func (m *Miner) mine(ctx context.Context) { if b != nil { btime := time.Unix(int64(b.Header.Timestamp), 0) - if build.Clock.Now().Before(btime) { + now := build.Clock.Now() + switch { + case btime == now: + // block timestamp is perfectly aligned with time. + case btime.After(now): if !m.niceSleep(build.Clock.Until(btime)) { log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out") build.Clock.Sleep(build.Clock.Until(btime)) } - } else { - log.Warnw("mined block in the past", "block-time", btime, - "time", build.Clock.Now(), "duration", build.Clock.Since(btime)) + default: + log.Warnw("mined block in the past", + "block-time", btime, "time", build.Clock.Now(), "difference", build.Clock.Since(btime)) } // TODO: should do better 'anti slash' protection here From fbf6596f20744628212d4b6a5c92ded32ae0693f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 14 Jul 2020 17:42:52 +0100 Subject: [PATCH 3/3] revert to real time for socket/stream deadlines. --- chain/blocksync/blocksync.go | 3 +-- chain/blocksync/blocksync_client.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/chain/blocksync/blocksync.go b/chain/blocksync/blocksync.go index 53b9feeff15..f5483eeaeea 100644 --- a/chain/blocksync/blocksync.go +++ b/chain/blocksync/blocksync.go @@ -11,7 +11,6 @@ import ( cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -127,7 +126,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) { } writeDeadline := 60 * time.Second - _ = s.SetDeadline(build.Clock.Now().Add(writeDeadline)) + _ = s.SetDeadline(time.Now().Add(writeDeadline)) // always use real time for socket/stream deadlines. if err := cborutil.WriteCborRPC(s, resp); err != nil { log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer()) return diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index ac8bcc7a18c..67e4fde8f76 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -291,7 +291,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B bs.RemovePeer(p) return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } - _ = s.SetWriteDeadline(build.Clock.Now().Add(5 * time.Second)) + _ = s.SetWriteDeadline(time.Now().Add(5 * time.Second)) // always use real time for socket/stream deadlines. if err := cborutil.WriteCborRPC(s, req); err != nil { _ = s.SetWriteDeadline(time.Time{})