From 664722de02618836996e6f78e43f998d8b69a1a6 Mon Sep 17 00:00:00 2001 From: Torsten Schmits Date: Mon, 15 Apr 2024 23:35:32 +0200 Subject: [PATCH] Add shutdown/restart capabilities to the peer simulator --- .../ouroboros-consensus-diffusion.cabal | 1 + .../Test/Consensus/Genesis/Tests/Uniform.hs | 54 +-- .../Consensus/PeerSimulator/NodeLifecycle.hs | 220 +++++++++++++ .../Test/Consensus/PeerSimulator/Run.hs | 308 +++++++++++------- .../Test/Consensus/PeerSimulator/StateView.hs | 14 +- .../Test/Consensus/PeerSimulator/Trace.hs | 18 +- .../Test/Consensus/PointSchedule.hs | 162 ++++++++- 7 files changed, 630 insertions(+), 147 deletions(-) create mode 100644 ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/NodeLifecycle.hs diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index a34bb4c3dc..816a5e0d5f 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -244,6 +244,7 @@ test-suite consensus-test Test.Consensus.PeerSimulator.ChainSync Test.Consensus.PeerSimulator.Config Test.Consensus.PeerSimulator.Handlers + Test.Consensus.PeerSimulator.NodeLifecycle Test.Consensus.PeerSimulator.Resources Test.Consensus.PeerSimulator.Run Test.Consensus.PeerSimulator.ScheduledBlockFetchServer diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/Uniform.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/Uniform.hs index c519b71337..0412a1c86b 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/Uniform.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Tests/Uniform.hs @@ -18,12 +18,11 @@ module Test.Consensus.Genesis.Tests.Uniform ( import Cardano.Slotting.Slot (SlotNo (SlotNo), WithOrigin (..)) import Control.Monad (replicateM) -import Control.Monad.Class.MonadTime.SI (DiffTime, Time (Time), - addTime) +import Control.Monad.Class.MonadTime.SI (Time, addTime) import Data.List (intercalate, sort) import qualified Data.List.NonEmpty as NE import qualified Data.Map.Strict as Map -import Data.Maybe (catMaybes, fromMaybe, mapMaybe) +import Data.Maybe (fromMaybe, mapMaybe) import Data.Word (Word64) import GHC.Stack (HasCallStack) import Ouroboros.Consensus.Block.Abstract (WithOrigin (NotOrigin)) @@ -70,7 +69,13 @@ tests = adjustQuickCheckTests (`div` 10) $ testProperty "serve adversarial branches" prop_serveAdversarialBranches, adjustQuickCheckTests (`div` 100) $ - testProperty "the LoE stalls the chain, but the immutable tip is honest" prop_loeStalling + testProperty "the LoE stalls the chain, but the immutable tip is honest" prop_loeStalling, + adjustQuickCheckTests (`div` 100) $ + -- This is a crude way of ensuring that we don't get chains with more than 100 blocks, + -- because this test writes the immutable chain to disk and `instance Binary TestBlock` + -- chokes on long chains. + adjustQuickCheckMaxSize (const 10) $ + testProperty "the node is shut down and restarted after some time" prop_downtime ] theProperty :: @@ -204,22 +209,9 @@ prop_leashingAttackStalling = -- timeouts to disconnect adversaries. genLeashingSchedule :: GenesisTest TestBlock () -> QC.Gen (PeersSchedule TestBlock) genLeashingSchedule genesisTest = do - Peers honest advs0 <- genUniformSchedulePoints genesisTest - let peerCount = 1 + length advs0 - extendedHonest = - duplicateLastPoint (endingDelay peerCount genesisTest) <$> honest + Peers honest advs0 <- ensureScheduleDuration genesisTest <$> genUniformSchedulePoints genesisTest advs <- mapM (mapM dropRandomPoints) advs0 - pure $ Peers extendedHonest advs - - endingDelay peerCount gt = - let cst = gtChainSyncTimeouts gt - bft = gtBlockFetchTimeouts gt - in 1 + fromIntegral peerCount * maximum (0 : catMaybes - [ canAwaitTimeout cst - , intersectTimeout cst - , busyTimeout bft - , streamingTimeout bft - ]) + pure $ Peers honest advs disableBoringTimeouts gt = gt { gtChainSyncTimeouts = (gtChainSyncTimeouts gt) @@ -242,13 +234,6 @@ prop_leashingAttackStalling = let (ys, zs) = splitAt i xs in ys ++ dropElemsAt (drop 1 zs) is - duplicateLastPoint - :: DiffTime -> [(Time, SchedulePoint TestBlock)] -> [(Time, SchedulePoint TestBlock)] - duplicateLastPoint d [] = [(Time d, ScheduleTipPoint Origin)] - duplicateLastPoint d xs = - let (t, p) = last xs - in xs ++ [(addTime d t, p)] - -- | Test that the leashing attacks do not delay the immutable tip after. The -- immutable tip needs to be advanced enough when the honest peer has offered -- all of its ticks. @@ -393,3 +378,20 @@ prop_loeStalling = allTips = simpleHash . AF.headHash <$> (btTrunk : suffixes) suffixes = btbSuffix <$> btBranches + +-- | This test sets 'scDowntime', which instructs the scheduler to shut all components down whenever a tick's duration +-- is greater than 11 seconds, and restarts it while only preserving the immutable DB after advancing the time. +-- +-- This ensures that a user may shut down their machine while syncing without additional vulnerabilities. +prop_downtime :: Property +prop_downtime = forAllGenesisTest + + (genChains (QC.choose (1, 4)) `enrichedWith` \ gt -> + ensureScheduleDuration gt <$> stToGen (uniformPointsWithDowntime (gtSecurityParam gt) (gtBlockTree gt))) + + (defaultSchedulerConfig + {scEnableLoE = True, scEnableLoP = True, scDowntime = Just 11}) + + shrinkPeerSchedules + + theProperty diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/NodeLifecycle.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/NodeLifecycle.hs new file mode 100644 index 0000000000..f83d1c32b5 --- /dev/null +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/NodeLifecycle.hs @@ -0,0 +1,220 @@ +{-# LANGUAGE DisambiguateRecordFields #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.Consensus.PeerSimulator.NodeLifecycle ( + LiveInterval (..) + , LiveIntervalResult (..) + , LiveNode (..) + , LiveResources (..) + , NodeLifecycle (..) + , lifecycleStart + , lifecycleStop + , restoreNode + ) where + +import Control.Tracer (Tracer (..), traceWith) +import Data.Functor (void) +import Data.Set (Set) +import qualified Data.Set as Set +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.Config (TopLevelConfig (..)) +import Ouroboros.Consensus.Storage.ChainDB.API +import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB +import qualified Ouroboros.Consensus.Storage.ChainDB.Impl as ChainDB +import Ouroboros.Consensus.Storage.ChainDB.Impl.Args (cdbsLoE, + updateTracer) +import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.ResourceRegistry +import Ouroboros.Network.AnchoredFragment (AnchoredFragment) +import qualified Ouroboros.Network.AnchoredFragment as AF +import qualified System.FS.Sim.MockFS as MockFS +import System.FS.Sim.MockFS (MockFS) +import Test.Consensus.PeerSimulator.Resources +import Test.Consensus.PeerSimulator.StateView +import Test.Consensus.PeerSimulator.Trace +import Test.Consensus.PointSchedule.Peers (PeerId) +import Test.Util.ChainDB +import Test.Util.Orphans.IOLike () +import Test.Util.TestBlock (TestBlock, testInitExtLedger) + +-- | Resources used for a single live interval of the node, constructed when the +-- node is started. +-- When the node is shut down, 'lnCopyToImmDb' is used to persist the current +-- chain. +data LiveNode blk m = LiveNode { + lnChainDb :: ChainDB m blk + , lnStateViewTracers :: StateViewTracers blk m + , lnStateTracer :: Tracer m () + + -- | Write persistent ChainDB state (the immutable and volatile DBs, but not + -- the ledger and GSM state) to the VFS TVars to preserve it for the next + -- interval. + -- Returns the immutable tip's slot for tracing. + , lnCopyToImmDb :: m (WithOrigin SlotNo) + + -- | The set of peers that should be started. + -- Based on the simulation results at node shutdown, disconnected peers are + -- removed for the next live interval. + , lnPeers :: Set PeerId + } + +-- | Result of a node shutdown at the end of a live interval. +data LiveIntervalResult blk = LiveIntervalResult { + -- | Used to initialize the 'StateViewTracers' of the next run to preserve + -- earlier disconnections for the final result. + lirPeerResults :: [PeerSimulatorResult blk] + + -- | The remaining peers, computed by removing all peers present in + -- 'lrPeerResults' from the current state in 'lnPeers'. + , lirActive :: Set PeerId + } + +-- | Resources used by the handlers 'lifecycleStart' and 'lifecycleStop' to +-- shut down running components, construct tracers used for single intervals, +-- and reset and persist state. +data LiveResources blk m = LiveResources { + lrRegistry :: ResourceRegistry m + , lrPeerSim :: PeerSimulatorResources m blk + , lrTracer :: Tracer m (TraceEvent blk) + , lrSTracer :: ChainDB m blk -> m (Tracer m ()) + , lrConfig :: TopLevelConfig blk + + -- | The chain DB state consists of several transient parts and the + -- immutable DB's virtual file system. + -- After 'lnCopyToImmDb' was executed, the latter will contain the final + -- state of an interval. + -- The rest is reset when the chain DB is recreated. + , lrCdb :: NodeDBs (StrictTVar m MockFS) + + -- | The LoE fragment must be reset for each live interval. + , lrLoEVar :: LoE (StrictTVar m (AnchoredFragment (Header blk))) + } + +data LiveInterval blk m = LiveInterval { + liResources :: LiveResources blk m + , liResult :: LiveIntervalResult blk + , liNode :: LiveNode blk m + } + +-- | Handlers for starting the node and shutting it down for each live interval, +-- using the state of the previous run. +data NodeLifecycle blk m = NodeLifecycle { + -- | The minimum tick duration that triggers a node downtime. + -- If this is 'Nothing', downtimes are disabled. + nlMinDuration :: Maybe DiffTime + + -- | Start the node with prior state. + -- For the first start, this must be called with an empty 'lirPeerResults' + -- and the initial set of all peers in 'lirActive'. + , nlStart :: LiveIntervalResult blk -> m (LiveNode blk m) + , nlShutdown :: LiveNode blk m -> m (LiveIntervalResult blk) + } + +-- | Create a ChainDB and start a BlockRunner that operate on the peers' +-- candidate fragments. +mkChainDb :: + IOLike m => + LiveResources TestBlock m -> + m (ChainDB m TestBlock, m (WithOrigin SlotNo)) +mkChainDb resources = do + atomically $ do + -- Reset only the non-persisted state of the ChainDB's file system mocks: + -- - GSM state and Ledger DB are discarded + -- - Immutable DB and Volatile DB are preserved for the next interval + modifyTVar (nodeDBsGsm lrCdb) (const MockFS.empty) + modifyTVar (nodeDBsLgr lrCdb) (const MockFS.empty) + chainDbArgs <- do + let args = updateTracer + (Tracer (traceWith lrTracer . TraceChainDBEvent)) + (fromMinimalChainDbArgs MinimalChainDbArgs { + mcdbTopLevelConfig = lrConfig + , mcdbChunkInfo = mkTestChunkInfo lrConfig + , mcdbInitLedger = testInitExtLedger + , mcdbRegistry = lrRegistry + , mcdbNodeDBs = lrCdb + }) + pure $ args { ChainDB.cdbsArgs = (ChainDB.cdbsArgs args) { + cdbsLoE = readTVarIO <$> lrLoEVar + } } + (_, (chainDB, internal)) <- allocate + lrRegistry + (\_ -> ChainDB.openDBInternal chainDbArgs False) + (ChainDB.closeDB . fst) + let ChainDB.Internal {intCopyToImmutableDB, intAddBlockRunner} = internal + void $ forkLinkedThread lrRegistry "AddBlockRunner" (void intAddBlockRunner) + pure (chainDB, intCopyToImmutableDB) + where + LiveResources {lrRegistry, lrTracer, lrConfig, lrCdb, lrLoEVar} = resources + +-- | Allocate all the resources that depend on the results of previous live +-- intervals, the ChainDB and its persisted state. +restoreNode :: + IOLike m => + LiveResources TestBlock m -> + LiveIntervalResult TestBlock -> + m (LiveNode TestBlock m) +restoreNode resources LiveIntervalResult {lirPeerResults, lirActive} = do + lnStateViewTracers <- stateViewTracersWithInitial lirPeerResults + (lnChainDb, lnCopyToImmDb) <- mkChainDb resources + lnStateTracer <- lrSTracer resources lnChainDb + pure LiveNode { + lnChainDb + , lnStateViewTracers + , lnStateTracer + , lnCopyToImmDb + , lnPeers = lirActive + } + +-- | Allocate resources with 'restoreNode' and pass them to the callback that +-- starts the node's threads. +lifecycleStart :: + forall m. + IOLike m => + (LiveInterval TestBlock m -> m ()) -> + LiveResources TestBlock m -> + LiveIntervalResult TestBlock -> + m (LiveNode TestBlock m) +lifecycleStart start liResources liResult = do + trace (TraceSchedulerEvent TraceNodeStartupStart) + liNode <- restoreNode liResources liResult + start LiveInterval {liResources, liResult, liNode} + chain <- atomically (ChainDB.getCurrentChain (lnChainDb liNode)) + trace (TraceSchedulerEvent (TraceNodeStartupComplete chain)) + pure liNode + where + trace = traceWith (lrTracer liResources) + +-- | Shut down the node by killing all its threads after extracting the +-- persistent state used to restart the node later. +lifecycleStop :: + (IOLike m, GetHeader blk) => + LiveResources blk m -> + LiveNode blk m -> + m (LiveIntervalResult blk) +lifecycleStop resources LiveNode {lnStateViewTracers, lnCopyToImmDb, lnPeers} = do + -- Trigger writing the immutable tip to the MockFS in our TVar for restoring in 'startNode' + immutableTip <- lnCopyToImmDb + trace (TraceSchedulerEvent (TraceNodeShutdownStart immutableTip)) + -- Remember which peers were still running before shutdown + lirPeerResults <- svtGetPeerSimulatorResults lnStateViewTracers + let disconnectedPeers = Set.fromList (psePeerId <$> lirPeerResults) + lirActive = lnPeers Set.\\ disconnectedPeers + -- Killing the peer overview threads should hopefully clean up all connections promptly + releaseAll lrRegistry + -- Reset the resources in TVars that were allocated by the simulator + atomically $ do + modifyTVar psrHandles (const mempty) + case lrLoEVar of + LoEEnabled var -> modifyTVar var (const (AF.Empty AF.AnchorGenesis)) + LoEDisabled -> pure () + trace (TraceSchedulerEvent TraceNodeShutdownComplete) + pure LiveIntervalResult {lirActive, lirPeerResults} + where + trace = traceWith lrTracer + LiveResources { + lrRegistry + , lrTracer + , lrPeerSim = PeerSimulatorResources {psrHandles} + , lrLoEVar + } = resources diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs index 00d551248c..b9a1b6d967 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs @@ -12,6 +12,7 @@ module Test.Consensus.PeerSimulator.Run ( , runPointSchedule ) where +import Control.Monad (foldM) import Control.Monad.Class.MonadTime (MonadTime) import Control.Monad.Class.MonadTimer.SI (MonadTimer) import Control.Tracer (Tracer (..), nullTracer, traceWith) @@ -19,6 +20,7 @@ import Data.Foldable (for_) import Data.Functor (void) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config (TopLevelConfig (..)) import Ouroboros.Consensus.Genesis.Governor (runGdd, updateLoEFragGenesis) @@ -37,8 +39,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Impl.Args import Ouroboros.Consensus.Util.Condense (Condense (..)) import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry -import Ouroboros.Network.AnchoredFragment (AnchoredFragment, - HasHeader) +import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.BlockFetch (FetchClientRegistry, bracketSyncWithFetchClient, newFetchClientRegistry) @@ -50,6 +51,7 @@ import Ouroboros.Network.Util.ShowProxy (ShowProxy) import qualified Test.Consensus.PeerSimulator.BlockFetch as BlockFetch import qualified Test.Consensus.PeerSimulator.ChainSync as ChainSync import Test.Consensus.PeerSimulator.Config +import Test.Consensus.PeerSimulator.NodeLifecycle import Test.Consensus.PeerSimulator.Resources import Test.Consensus.PeerSimulator.StateDiagram (peerSimStateDiagramSTMTracerDebug) @@ -64,7 +66,7 @@ import Test.Consensus.PointSchedule.Peers (Peer (..), PeerId, getPeerIds) import Test.Util.ChainDB import Test.Util.Orphans.IOLike () -import Test.Util.TestBlock (Header (..), TestBlock, testInitExtLedger) +import Test.Util.TestBlock (TestBlock) -- | Behavior config for the scheduler. data SchedulerConfig = @@ -97,6 +99,10 @@ data SchedulerConfig = -- | Whether to enable to LoP. The parameters of the LoP come from -- 'GenesisTest'. , scEnableLoP :: Bool + + -- | Enable node downtime if this is 'Just', using the value as minimum tick + -- duration to trigger it. + , scDowntime :: Maybe DiffTime } -- | Default scheduler config @@ -109,7 +115,8 @@ defaultSchedulerConfig = scTrace = True, scTraceState = False, scEnableLoE = False, - scEnableLoP = False + scEnableLoP = False, + scDowntime = Nothing } -- | Enable debug tracing during a scheduler test. @@ -201,24 +208,39 @@ startBlockFetchConnectionThread dispatchTick :: forall m blk. IOLike m => Tracer m (TraceSchedulerEvent blk) -> - Tracer m () -> - ChainDB m blk -> StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) -> Map PeerId (PeerResources m blk) -> + NodeLifecycle blk m -> + LiveNode blk m -> (Int, (DiffTime, Peer (NodeState blk))) -> - m () -dispatchTick tracer stateTracer chainDb varHandles peers (number, (duration, Peer pid state)) = + m (LiveNode blk m) +dispatchTick tracer varHandles peers lifecycle node (number, (duration, Peer pid state)) = case peers Map.!? pid of Just PeerResources {prUpdateState} -> do traceNewTick atomically (prUpdateState state) - threadDelay duration - traceWith stateTracer () + newNode <- checkDowntime + traceWith (lnStateTracer newNode) () + pure newNode Nothing -> error "“The impossible happened,” as GHC would say." where + checkDowntime + | Just minInterval <- nlMinDuration + , duration > minInterval + = do + results <- nlShutdown node + threadDelay duration + nlStart results + | otherwise + = do + threadDelay duration + pure node + + NodeLifecycle {nlMinDuration, nlStart, nlShutdown} = lifecycle + traceNewTick :: m () traceNewTick = do - currentChain <- atomically $ ChainDB.getCurrentChain chainDb + currentChain <- atomically $ ChainDB.getCurrentChain (lnChainDb node) csState <- atomically $ do m <- readTVar varHandles traverse (readTVar . CSClient.cschState) (m Map.!? pid) @@ -232,18 +254,19 @@ dispatchTick tracer stateTracer chainDb varHandles peers (number, (duration, Pee runScheduler :: IOLike m => Tracer m (TraceSchedulerEvent blk) -> - Tracer m () -> - ChainDB m blk -> StrictTVar m (Map PeerId (ChainSyncClientHandle m blk)) -> PeersSchedule blk -> Map PeerId (PeerResources m blk) -> - m () -runScheduler tracer stateTracer chainDb varHandles ps peers = do + NodeLifecycle blk m -> + m (ChainDB m blk, StateViewTracers blk m) +runScheduler tracer varHandles ps peers lifecycle@NodeLifecycle {nlStart} = do + node0 <- nlStart LiveIntervalResult {lirActive = Map.keysSet peers, lirPeerResults = []} traceWith tracer TraceBeginningOfTime - mapM_ - (dispatchTick tracer stateTracer chainDb varHandles peers) - (zip [0..] (peersStatesRelative ps)) + LiveNode {lnChainDb, lnStateViewTracers} <- foldM tick node0 (zip [0..] (peersStatesRelative ps)) traceWith tracer TraceEndOfTime + pure (lnChainDb, lnStateViewTracers) + where + tick = dispatchTick tracer varHandles peers lifecycle -- | Create the shared resource for the LoE if the feature is enabled in the config. -- This is used by the ChainDB and the GDD governor. @@ -257,85 +280,104 @@ mkLoEVar SchedulerConfig {scEnableLoE} | otherwise = pure LoEDisabled --- | Construct STM resources, set up ChainSync and BlockFetch threads, and --- send all ticks in a 'PointSchedule' to all given peers in turn. -runPointSchedule :: +mkStateTracer :: + IOLike m => + SchedulerConfig -> + GenesisTest TestBlock s -> + PeerSimulatorResources m TestBlock -> + ChainDB m TestBlock -> + m (Tracer m ()) +mkStateTracer schedulerConfig GenesisTest {gtBlockTree} PeerSimulatorResources {psrHandles, psrPeers} chainDb + | scTraceState schedulerConfig + , let getCandidates = viewChainSyncState psrHandles CSClient.csCandidate + getCurrentChain = ChainDB.getCurrentChain chainDb + getPoints = traverse readTVar (srCurrentState . prShared <$> psrPeers) + = peerSimStateDiagramSTMTracerDebug gtBlockTree getCurrentChain getCandidates getPoints + | otherwise + = pure nullTracer + +-- | Start all threads for ChainSync, BlockFetch and GDD, using the resources +-- for a single live interval. +-- Only start peers that haven't been disconnected in a previous interval, +-- provided by 'LiveIntervalResult'. +startNode :: forall m. (IOLike m, MonadTime m, MonadTimer m) => SchedulerConfig -> GenesisTestFull TestBlock -> - Tracer m (TraceEvent TestBlock) -> - m (StateView TestBlock) -runPointSchedule schedulerConfig genesisTest tracer0 = - withRegistry $ \registry -> do - stateViewTracers <- defaultStateViewTracers - resources <- makePeerSimulatorResources tracer gtBlockTree (getPeerIds gtSchedule) - let - handles = psrHandles resources - getCandidates = viewChainSyncState handles CSClient.csCandidate - loEVar <- mkLoEVar schedulerConfig - chainDb <- mkChainDb tracer config registry (readTVarIO <$> loEVar) - fetchClientRegistry <- newFetchClientRegistry - let chainDbView = CSClient.defaultChainDbView chainDb - for_ (psrPeers resources) $ \PeerResources {prShared, prChainSync, prBlockFetch} -> do - forkLinkedThread registry ("Peer overview " ++ (show $ srPeerId prShared)) $ - -- The peerRegistry helps ensuring that if any thread fails, then - -- the registry is closed and all threads related to the peer are - -- killed. - withRegistry $ \peerRegistry -> do - (csClient, csServer) <- startChainSyncConnectionThread peerRegistry tracer config chainDbView fetchClientRegistry prShared prChainSync chainSyncTimeouts_ chainSyncLoPBucketConfig stateViewTracers (psrHandles resources) - BlockFetch.startKeepAliveThread peerRegistry fetchClientRegistry (srPeerId prShared) - (bfClient, bfServer) <- startBlockFetchConnectionThread peerRegistry tracer stateViewTracers fetchClientRegistry (pure Continue) prShared prBlockFetch blockFetchTimeouts_ - waitAnyThread [csClient, csServer, bfClient, bfServer] - -- The block fetch logic needs to be started after the block fetch clients - -- otherwise, an internal assertion fails because getCandidates yields more - -- peer fragments than registered clients. - let getCurrentChain = ChainDB.getCurrentChain chainDb - getPoints = traverse readTVar (srCurrentState . prShared <$> psrPeers resources) - mkStateTracer - | scTraceState schedulerConfig - = peerSimStateDiagramSTMTracerDebug gtBlockTree getCurrentChain getCandidates getPoints - | otherwise - = pure nullTracer - - gdd = updateLoEFragGenesis config (mkGDDTracerTestBlock tracer) (readTVar handles) - -- We make GDD rerun every time the anchor or the blocks of the - -- selection change. - gddTrigger = do - s <- viewChainSyncState handles (\ s -> (csLatestSlot s, csIdling s)) - c <- getCurrentChain - return (s, [AF.anchorToHash $ AF.headAnchor c]) - - stateTracer <- mkStateTracer - BlockFetch.startBlockFetchLogic registry tracer chainDb fetchClientRegistry getCandidates - for_ loEVar $ \ var -> - void $ forkLinkedThread registry "LoE updater background" $ - runGdd gdd var chainDb gddTrigger - runScheduler - (Tracer $ traceWith tracer . TraceSchedulerEvent) - stateTracer - chainDb - handles - gtSchedule - (psrPeers resources) - snapshotStateView stateViewTracers chainDb + LiveInterval TestBlock m -> + m () +startNode schedulerConfig genesisTest interval = do + let + handles = psrHandles lrPeerSim + getCandidates = viewChainSyncState handles CSClient.csCandidate + fetchClientRegistry <- newFetchClientRegistry + let chainDbView = CSClient.defaultChainDbView lnChainDb + activePeers = Map.restrictKeys (psrPeers lrPeerSim) (lirActive liveResult) + for_ activePeers $ \PeerResources {prShared, prChainSync, prBlockFetch} -> do + let pid = srPeerId prShared + forkLinkedThread lrRegistry ("Peer overview " ++ show pid) $ + -- The peerRegistry helps ensuring that if any thread fails, then + -- the registry is closed and all threads related to the peer are + -- killed. + withRegistry $ \peerRegistry -> do + (csClient, csServer) <- + startChainSyncConnectionThread + peerRegistry + lrTracer + lrConfig + chainDbView + fetchClientRegistry + prShared + prChainSync + chainSyncTimeouts_ + chainSyncLoPBucketConfig + lnStateViewTracers + handles + BlockFetch.startKeepAliveThread peerRegistry fetchClientRegistry pid + (bfClient, bfServer) <- + startBlockFetchConnectionThread + peerRegistry + lrTracer + lnStateViewTracers + fetchClientRegistry + (pure Continue) + prShared + prBlockFetch + blockFetchTimeouts_ + waitAnyThread [csClient, csServer, bfClient, bfServer] + -- The block fetch logic needs to be started after the block fetch clients + -- otherwise, an internal assertion fails because getCandidates yields more + -- peer fragments than registered clients. + let getCurrentChain = ChainDB.getCurrentChain lnChainDb + + gdd = updateLoEFragGenesis lrConfig (mkGDDTracerTestBlock lrTracer) (readTVar handles) + -- We make GDD rerun every time the anchor or the blocks of the + -- selection change. + gddTrigger = do + s <- viewChainSyncState handles (\ s -> (csLatestSlot s, csIdling s)) + c <- getCurrentChain + return (s, [AF.anchorToHash $ AF.headAnchor c]) + + BlockFetch.startBlockFetchLogic lrRegistry lrTracer lnChainDb fetchClientRegistry getCandidates + for_ lrLoEVar $ \ var -> do + forkLinkedThread lrRegistry "LoE updater background" $ + void $ runGdd gdd var lnChainDb gddTrigger where + LiveResources {lrRegistry, lrTracer, lrConfig, lrPeerSim, lrLoEVar} = resources + + LiveInterval { + liResources = resources + , liResult = liveResult + , liNode = LiveNode {lnChainDb, lnStateViewTracers} + } = interval + GenesisTest { - gtSecurityParam = k - , gtBlockTree - , gtSchedule - , gtChainSyncTimeouts + gtChainSyncTimeouts , gtBlockFetchTimeouts , gtLoPBucketParams = LoPBucketParams { lbpCapacity, lbpRate } - , gtForecastRange - , gtGenesisWindow } = genesisTest - config = defaultCfg k gtForecastRange gtGenesisWindow - - -- FIXME: This type of configuration should move to `Trace.mkTracer`. - tracer = if scTrace schedulerConfig then tracer0 else nullTracer - chainSyncTimeouts_ = if scEnableChainSyncTimeouts schedulerConfig then gtChainSyncTimeouts @@ -351,33 +393,69 @@ runPointSchedule schedulerConfig genesisTest tracer0 = then gtBlockFetchTimeouts else BlockFetch.blockFetchNoTimeouts --- | Create a ChainDB and start a BlockRunner that operate on the peers' --- candidate fragments. -mkChainDb :: - IOLike m => +-- | Set up all resources related to node start/shutdown. +nodeLifecycle :: + (IOLike m, MonadTime m, MonadTimer m) => + SchedulerConfig -> + GenesisTestFull TestBlock -> Tracer m (TraceEvent TestBlock) -> - TopLevelConfig TestBlock -> ResourceRegistry m -> - GetLoEFragment m TestBlock -> - m (ChainDB m TestBlock) -mkChainDb tracer nodeCfg registry cdbsLoE = do - chainDbArgs <- do - mcdbNodeDBs <- emptyNodeDBs - let args = updateTracer - (Tracer (traceWith tracer . TraceChainDBEvent)) - (fromMinimalChainDbArgs MinimalChainDbArgs { - mcdbTopLevelConfig = nodeCfg - , mcdbChunkInfo = mkTestChunkInfo nodeCfg - , mcdbInitLedger = testInitExtLedger - , mcdbRegistry = registry - , mcdbNodeDBs - } - ) - pure $ args { ChainDB.Impl.cdbsArgs = (ChainDB.Impl.cdbsArgs args) { cdbsLoE } } - (_, (chainDB, ChainDB.Impl.Internal{intAddBlockRunner})) <- - allocate - registry - (\_ -> ChainDB.Impl.openDBInternal chainDbArgs False) - (ChainDB.closeDB . fst) - _ <- forkLinkedThread registry "AddBlockRunner" intAddBlockRunner - pure chainDB + PeerSimulatorResources m TestBlock -> + m (NodeLifecycle TestBlock m) +nodeLifecycle schedulerConfig genesisTest lrTracer lrRegistry lrPeerSim = do + lrCdb <- emptyNodeDBs + lrLoEVar <- mkLoEVar schedulerConfig + let + resources = + LiveResources { + lrRegistry + , lrTracer + , lrSTracer = mkStateTracer schedulerConfig genesisTest lrPeerSim + , lrConfig + , lrPeerSim + , lrCdb + , lrLoEVar + } + pure NodeLifecycle { + nlMinDuration = scDowntime schedulerConfig + , nlStart = lifecycleStart (startNode schedulerConfig genesisTest) resources + , nlShutdown = lifecycleStop resources + } + where + lrConfig = defaultCfg k gtForecastRange gtGenesisWindow + + GenesisTest { + gtSecurityParam = k + , gtForecastRange + , gtGenesisWindow + } = genesisTest + +-- | Construct STM resources, set up ChainSync and BlockFetch threads, and +-- send all ticks in a 'PointSchedule' to all given peers in turn. +runPointSchedule :: + forall m. + (IOLike m, MonadTime m, MonadTimer m) => + SchedulerConfig -> + GenesisTestFull TestBlock -> + Tracer m (TraceEvent TestBlock) -> + m (StateView TestBlock) +runPointSchedule schedulerConfig genesisTest tracer0 = + withRegistry $ \registry -> do + peerSim <- makePeerSimulatorResources tracer gtBlockTree (getPeerIds gtSchedule) + lifecycle <- nodeLifecycle schedulerConfig genesisTest tracer registry peerSim + (chainDb, stateViewTracers) <- runScheduler + (Tracer $ traceWith tracer . TraceSchedulerEvent) + (psrHandles peerSim) + gtSchedule + (psrPeers peerSim) + lifecycle + snapshotStateView stateViewTracers chainDb + where + + GenesisTest { + gtBlockTree + , gtSchedule + } = genesisTest + + -- FIXME: This type of configuration should move to `Trace.mkTracer`. + tracer = if scTrace schedulerConfig then tracer0 else nullTracer diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/StateView.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/StateView.hs index 3d83b5a4fa..f8c1fbff7b 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/StateView.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/StateView.hs @@ -15,10 +15,12 @@ module Test.Consensus.PeerSimulator.StateView ( , exceptionsByComponent , pscrToException , snapshotStateView + , stateViewTracersWithInitial ) where -import Control.Tracer (Tracer) +import Control.Tracer (Tracer, traceWith) import Data.Containers.ListUtils (nubOrd) +import Data.Foldable (for_) import Data.List (sort) import Data.Maybe (mapMaybe) import Network.TypedProtocol.Codec (AnyMessage) @@ -196,6 +198,16 @@ defaultStateViewTracers = do (svtPeerSimulatorResultsTracer, svtGetPeerSimulatorResults) <- recordingTracerTVar pure StateViewTracers {svtPeerSimulatorResultsTracer, svtGetPeerSimulatorResults} +-- | Call 'defaultStateViewTracers' and add the provided results. +stateViewTracersWithInitial :: + IOLike m => + [PeerSimulatorResult blk] -> + m (StateViewTracers blk m) +stateViewTracersWithInitial initial = do + svt <- defaultStateViewTracers + for_ initial (traceWith (svtPeerSimulatorResultsTracer svt)) + pure svt + -- | Use the state view tracers as well as some extra information to produce a -- state view. This mostly consists in reading and storing the current state of -- the tracers. diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs index 2342b3e01e..b3f42c5b3f 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs @@ -59,6 +59,10 @@ data TraceSchedulerEvent blk -- @0@), the duration of the tick, the states, the current chain and the -- candidate fragment. TraceNewTick Int DiffTime (Peer (NodeState blk)) (AnchoredFragment (Header blk)) (Maybe (AnchoredFragment (Header blk))) + | TraceNodeShutdownStart (WithOrigin SlotNo) + | TraceNodeShutdownComplete + | TraceNodeStartupStart + | TraceNodeStartupComplete (AnchoredFragment (Header blk)) type HandlerName = String @@ -192,6 +196,15 @@ traceSchedulerEventTestBlockWith setTickTime tracer0 _tracer = \case " candidate fragment: " ++ maybe "Nothing" terseHFragment mCandidateFrag ] + TraceNodeShutdownStart immTip -> + traceWith tracer0 (" Initiating node shutdown with immutable tip at slot " ++ condense immTip) + TraceNodeShutdownComplete -> + traceWith tracer0 " Node shutdown complete" + TraceNodeStartupStart -> + traceWith tracer0 " Initiating node startup" + TraceNodeStartupComplete selection -> + traceWith tracer0 (" Node startup complete with selection " ++ terseHFragment selection) + traceScheduledServerHandlerEventTestBlockWith :: Tracer m String -> String -> @@ -394,9 +407,12 @@ terseGDDEvent = \case block = if hasBlockAfter then ", has header after sgen" else " " + -- Note: At some point, I changed this to use @headPoint@ erroneously, so to be clear about what this signifies: + -- The first point after the anchor (which is returned by @lastPoint@, clearly) is used for the condition that + -- the density comparison should not be applied to two peers if they share any headers after the LoE fragment. lastPoint = "point: " ++ - tersePoint (castPoint @(Header TestBlock) @TestBlock (AF.headPoint clippedFragment)) ++ + tersePoint (castPoint @(Header TestBlock) @TestBlock (AF.lastPoint clippedFragment)) ++ ", " showLatestSlot = \case diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PointSchedule.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PointSchedule.hs index b22189635f..0d3aaa7848 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PointSchedule.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PointSchedule.hs @@ -32,6 +32,7 @@ module Test.Consensus.PointSchedule ( , PeersSchedule , RunGenesisTestResult (..) , enrichedWith + , ensureScheduleDuration , genesisNodeState , longRangeAttack , nsTipTip @@ -43,6 +44,7 @@ module Test.Consensus.PointSchedule ( , prettyPeersSchedule , stToGen , uniformPoints + , uniformPointsWithDowntime ) where import Cardano.Slotting.Time (SlotLength) @@ -52,7 +54,7 @@ import Control.Monad.ST (ST) import Data.Foldable (toList) import Data.Functor (($>)) import Data.List (mapAccumL, partition, scanl') -import Data.Maybe (mapMaybe) +import Data.Maybe (catMaybes, fromMaybe, mapMaybe) import Data.Time (DiffTime) import Data.Word (Word64) import Ouroboros.Consensus.Block.Abstract (WithOrigin (..), @@ -60,13 +62,14 @@ import Ouroboros.Consensus.Block.Abstract (WithOrigin (..), import Ouroboros.Consensus.Ledger.SupportsProtocol (GenesisWindow (..)) import Ouroboros.Consensus.Network.NodeToNode (ChainSyncTimeout (..)) -import Ouroboros.Consensus.Protocol.Abstract (SecurityParam, - maxRollbacks) +import Ouroboros.Consensus.Protocol.Abstract + (SecurityParam (SecurityParam), maxRollbacks) import Ouroboros.Consensus.Util.Condense (Condense (..), CondenseList (..), PaddingDirection (..), condenseListWithPadding, padListWith) import qualified Ouroboros.Network.AnchoredFragment as AF -import Ouroboros.Network.Block (Tip (..), tipFromHeader) +import Ouroboros.Network.Block (SlotNo (..), Tip (..), blockSlot, + tipFromHeader) import Ouroboros.Network.Point (withOrigin) import qualified System.Random.Stateful as Random import System.Random.Stateful (STGenM, StatefulGen, runSTGen_) @@ -306,6 +309,132 @@ uniformPoints BlockTree {btTrunk, btBranches} g = do rollbackProb = 0.2 +minusClamp :: (Ord a, Num a) => a -> a -> a +minusClamp a b | a <= b = 0 + | otherwise = a - b + +zipPadN :: forall a . [[a]] -> [[Maybe a]] +zipPadN = + spin [] + where + spin acc as + | all null as + = reverse acc + | let (h, t) = unzip (takeNext <$> as) + = spin (h : acc) t + + takeNext = \case + [] -> (Nothing, []) + h : t -> (Just h, t) + +isTip :: SchedulePoint blk -> Bool +isTip = \case + ScheduleTipPoint _ -> True + _ -> False + +tipTimes :: [(Time, SchedulePoint blk)] -> [Time] +tipTimes = + fmap fst . filter (isTip . snd) + +bumpTips :: [Time] -> [(Time, SchedulePoint blk)] -> [(Time, SchedulePoint blk)] +bumpTips tips = + snd . mapAccumL step tips + where + step (t0 : tn) (_, p) + | isTip p + = (tn, (t0, p)) + step ts a = (ts, a) + +syncTips :: [(Time, SchedulePoint blk)] -> [[(Time, SchedulePoint blk)]] -> ([(Time, SchedulePoint blk)], [[(Time, SchedulePoint blk)]]) +syncTips honest advs = + (bump honest, bump <$> advs) + where + bump = bumpTips earliestTips + earliestTips = chooseEarliest <$> zipPadN (tipTimes <$> scheds) + scheds = honest : advs + chooseEarliest times = minimum (fromMaybe (Time 0) <$> times) + +-- | This is a variant of 'uniformPoints' that uses multiple tip points, used to simulate node downtimes. +-- Ultimately, this should be replaced by a redesign of the peer schedule generator that is aware of node liveness +-- intervals. +-- +-- Chooses the first tip points somewhere in the middle of the honest chain: +-- The "pause slot" is half of the honest head slot, or the slot of the kth block, whichever is greater. +-- The last block smaller than the pause slot is then used as the first tip for each branch. +-- The second tip is the last block of each branch. +-- +-- Includes rollbacks in some schedules. +uniformPointsWithDowntime :: + (StatefulGen g m, AF.HasHeader blk) => + SecurityParam -> + BlockTree blk -> + g -> + m (PeersSchedule blk) +uniformPointsWithDowntime (SecurityParam k) BlockTree {btTrunk, btBranches} g = do + let + kSlot = withOrigin 0 (fromIntegral . unSlotNo) (AF.headSlot (AF.takeOldest (fromIntegral k) btTrunk)) + midSlot = (AF.length btTrunk) `div` 2 + lowerBound = max kSlot midSlot + pauseSlot <- SlotNo . fromIntegral <$> Random.uniformRM (lowerBound, AF.length btTrunk - 1) g + honestTip0 <- firstTip pauseSlot btTrunk + honest <- mkSchedule [(IsTrunk, [honestTip0, minusClamp (AF.length btTrunk) 1])] [] + advs <- takeBranches pauseSlot btBranches + let (honest', advs') = syncTips honest advs + pure (mkPeers honest' advs') + where + takeBranches pause = \case + [] -> pure [] + [b] -> pure <$> withoutRollback pause b + b1 : b2 : branches -> do + a <- Random.uniformDouble01M g + if a < rollbackProb + then do + this <- withRollback pause b1 b2 + rest <- takeBranches pause branches + pure (this : rest) + else do + this <- withoutRollback pause b1 + rest <- takeBranches pause (b2 : branches) + pure (this : rest) + + withoutRollback pause branch = do + tips <- mkTips pause branch + mkSchedule tips [btbSuffix branch] + + withRollback pause b1 b2 = do + firstTips <- mkTips pause b1 + let secondTips = [minusClamp (AF.length (btbSuffix b2)) 1] + mkSchedule (firstTips ++ [(IsBranch, secondTips)]) [btbSuffix b1, btbSuffix b2] + + mkSchedule tips branches = do + params <- mkParams + peerScheduleFromTipPoints g params tips btTrunk branches + + mkTips pause branch + | AF.length full == 0 = + error "empty branch" + | otherwise = do + tip0 <- firstTip pause (btbFull branch) + let (pre, post) = partition (< firstSuffixBlock) [tip0, fullLen - 1] + pure ((if null pre then [] else [(IsTrunk, pre)]) ++ [(IsBranch, shift <$> post)]) + where + shift i = i - firstSuffixBlock + firstSuffixBlock = fullLen - AF.length (btbSuffix branch) + fullLen = AF.length full + full = btbFull branch + + firstTip pause frag = pure (minusClamp (AF.length (AF.dropWhileNewest (\ b -> blockSlot b > pause) frag)) 1) + + mkParams = do + -- These values appear to be large enough to create pauses of 100 seconds and more. + tipL <- uniformRMDiffTime (0.5, 1) g + tipU <- uniformRMDiffTime (1, 2) g + headerL <- uniformRMDiffTime (0.018, 0.03) g + headerU <- uniformRMDiffTime (0.021, 0.04) g + pure defaultPeerScheduleParams {pspTipDelayInterval = (tipL, tipU), pspHeaderDelayInterval = (headerL, headerU)} + + rollbackProb = 0.2 + newtype ForecastRange = ForecastRange { unForecastRange :: Word64 } deriving (Show) @@ -397,3 +526,28 @@ stToGen :: stToGen gen = do seed :: QCGen <- arbitrary pure (runSTGen_ seed gen) + +duplicateLastPoint + :: DiffTime -> [(Time, SchedulePoint blk)] -> [(Time, SchedulePoint blk)] +duplicateLastPoint d [] = [(Time d, ScheduleTipPoint Origin)] +duplicateLastPoint d xs = + let (t, p) = last xs + in xs ++ [(addTime d t, p)] + +ensureScheduleDuration :: GenesisTest blk a -> PeersSchedule blk -> PeersSchedule blk +ensureScheduleDuration gt Peers {honest, others} = + Peers {honest = extendHonest, others} + where + extendHonest = duplicateLastPoint endingDelay <$> honest + + endingDelay = + let cst = gtChainSyncTimeouts gt + bft = gtBlockFetchTimeouts gt + in 1 + fromIntegral peerCount * maximum (0 : catMaybes + [ canAwaitTimeout cst + , intersectTimeout cst + , busyTimeout bft + , streamingTimeout bft + ]) + + peerCount = 1 + length others