diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Setup.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Setup.hs index a9e9dd8bd1..b06fa9e5d1 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Setup.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Genesis/Setup.hs @@ -38,7 +38,7 @@ runTest genesisTest@GenesisTest {gtBlockTree, gtHonestAsc} schedule makeProperty mapM_ (traceWith tracer) $ BT.prettyPrint gtBlockTree - result <- runPointSchedule genesisTest schedule tracer + result <- runPointSchedule schedulerConfig genesisTest schedule tracer trace <- unlines <$> getTrace let @@ -49,3 +49,5 @@ runTest genesisTest@GenesisTest {gtBlockTree, gtHonestAsc} schedule makeProperty counterexample ("result: " <> condense fragment) (makeProperty fragment) pure $ counterexample trace prop + where + schedulerConfig = SchedulerConfig {enableTimeouts = False} diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Network/Driver/Limits/Extras.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Network/Driver/Limits/Extras.hs index d674059d86..9b6641c1ce 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Network/Driver/Limits/Extras.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/Network/Driver/Limits/Extras.hs @@ -8,6 +8,7 @@ module Test.Consensus.Network.Driver.Limits.Extras ( chainSyncNoSizeLimits + , chainSyncNoTimeouts , chainSyncTimeouts , runConnectedPeersPipelinedWithLimits ) where @@ -95,3 +96,11 @@ chainSyncTimeouts t f = mustReplyTimeout = Just $ secondsToDiffTime $ round $ realToFrac (getSlotLength t) * log (1 - 0.999) / log (1 - ascVal f) + +chainSyncNoTimeouts :: ChainSyncTimeout +chainSyncNoTimeouts = + ChainSyncTimeout { + canAwaitTimeout = Nothing + , intersectTimeout = Nothing + , mustReplyTimeout = Nothing + } diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Handlers.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Handlers.hs index 8dd95f887c..52f0ba022e 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Handlers.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Handlers.hs @@ -28,7 +28,7 @@ import qualified Test.Consensus.BlockTree as BT import Test.Consensus.BlockTree (BlockTree) import Test.Consensus.PeerSimulator.ScheduledChainSyncServer (FindIntersect (..), - RequestNext (RollBackward, RollForward)) + RequestNext (AwaitReply, RollBackward, RollForward)) import Test.Consensus.PointSchedule (AdvertisedPoints (header, tip), HeaderPoint (HeaderPoint), TipPoint (TipPoint)) import Test.Util.Orphans.IOLike () @@ -52,7 +52,7 @@ handlerFindIntersection :: STM m (FindIntersect, [String]) handlerFindIntersection currentIntersection blockTree points clientPoints = do let TipPoint tip' = tip points - tipPoint = Ouroboros.Network.Block.getTipPoint tip' + tipPoint = getTipPoint tip' fragment = fromJust $ BT.findFragment tipPoint blockTree case intersectWith fragment clientPoints of Nothing -> @@ -81,16 +81,20 @@ handlerRequestNext currentIntersection blockTree points = runWriterT $ do intersection <- lift $ readTVar currentIntersection trace $ " last intersection is " ++ condense intersection - let HeaderPoint header' = header points - headerPoint = AF.castPoint $ blockPoint header' maybe noPathError analysePath (BT.findPath intersection headerPoint blockTree) where noPathError = error "serveHeader: intersection and and headerPoint should always be in the block tree" analysePath = \case - -- If the anchor is the intersection (the source of the path-finding) - -- but the fragment is empty, then the intersection is exactly our - -- header point and there is nothing to do. + -- If the anchor is the intersection (the source of the path-finding) but + -- the fragment is empty, then the intersection is exactly our header + -- point and there is nothing to do. If additionally the header point is + -- also the tip point (because we served our whole chain, or we are + -- stalling as an adversarial behaviour), then we ask the client to wait; + -- otherwise we just do nothing. + (BT.PathAnchoredAtSource True, AF.Empty _) | getTipPoint tip' == headerPoint -> do + trace " chain has been fully served" + pure (Just AwaitReply) (BT.PathAnchoredAtSource True, AF.Empty _) -> do trace " intersection is exactly our header point" pure Nothing @@ -105,6 +109,15 @@ handlerRequestNext currentIntersection blockTree points = -- the intersection is further than the tip that we can serve. (BT.PathAnchoredAtSource False, AF.Empty _) -> do trace " intersection is further than our header point" + -- REVIEW: The following is a hack that allows the honest peer to not + -- get disconnected when it falls behind. Why does a peer doing that not + -- get disconnected from? + -- + -- We decided to hold off on making this work with timeouts, so we'll return + -- Nothing here for now. + -- The consequence of this is that a slow peer will just block until it reaches + -- the fork intersection in its schedule. + -- pure (Just AwaitReply) pure Nothing -- If the anchor is not the intersection and the fragment is non-empty, -- then we require a rollback @@ -112,9 +125,12 @@ handlerRequestNext currentIntersection blockTree points = trace $ " we will require a rollback to" ++ condense (AF.anchorPoint fragment) trace $ " fragment: " ++ condense fragment let - tip' = coerce (tip points) point = AF.anchorPoint fragment lift $ writeTVar currentIntersection point pure $ Just (RollBackward point tip') + HeaderPoint header' = header points + headerPoint = AF.castPoint $ blockPoint header' + TipPoint tip' = tip points + trace = tell . pure diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs index 8ac3c9c26a..890dc93610 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Run.hs @@ -6,6 +6,7 @@ module Test.Consensus.PeerSimulator.Run ( ChainSyncException (..) + , SchedulerConfig (..) , runPointSchedule ) where @@ -13,7 +14,7 @@ import Control.Monad.Class.MonadAsync (AsyncCancelled (AsyncCancelled)) import Control.Monad.Class.MonadTime (MonadTime) import Control.Monad.Class.MonadTimer.SI (MonadTimer) -import Control.Tracer (Tracer (Tracer), nullTracer, traceWith) +import Control.Tracer (Tracer, nullTracer, traceWith) import Data.Foldable (for_) import Data.Functor (void) import Data.List.NonEmpty (NonEmpty, nonEmpty) @@ -43,7 +44,6 @@ import Ouroboros.Network.Channel (createConnectedChannels) import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.Driver.Limits -import Ouroboros.Network.Driver.Simple (Role (Client, Server)) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined (ChainSyncClientPipelined, chainSyncClientPeerPipelined) import Ouroboros.Network.Protocol.ChainSync.Codec @@ -67,6 +67,17 @@ import Test.Util.ChainDB import Test.Util.Orphans.IOLike () import Test.Util.TestBlock (Header (..), TestBlock, testInitExtLedger) +-- | Behavior config for the scheduler. +data SchedulerConfig = + SchedulerConfig { + -- | Whether to use timouts for the ChainSync protocol. + -- These apply when the client sends a MsgRequestNext and the server doesn't reply. + -- Because the point schedule cannot yet handle the case where a slow peer has a + -- header point that's behind the latest header that another peer has sent, we need + -- to be able to disable them. + enableTimeouts :: Bool + } + deriving (Show) basicChainSyncClient :: IOLike m => @@ -112,11 +123,13 @@ startChainSyncConnectionThread :: FetchClientRegistry PeerId (Header TestBlock) TestBlock m -> SharedResources m -> ChainSyncResources m -> + SchedulerConfig -> m (StrictTVar m (Maybe ChainSyncException)) -startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDbView fetchClientRegistry SharedResources {srPeerId, srCandidateFragment} ChainSyncResources {csrServer} = do +startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDbView fetchClientRegistry SharedResources {srPeerId, srCandidateFragment} ChainSyncResources {csrServer} SchedulerConfig {enableTimeouts} = do let slotLength = HardFork.eraSlotLength . topLevelConfigLedger $ cfg - timeouts = chainSyncTimeouts slotLength activeSlotCoefficient + timeouts | enableTimeouts = chainSyncTimeouts slotLength activeSlotCoefficient + | otherwise = chainSyncNoTimeouts traceWith tracer $ "timeouts:" traceWith tracer $ " canAwait = " ++ show (canAwaitTimeout timeouts) traceWith tracer $ " intersect = " ++ show (intersectTimeout timeouts) @@ -126,7 +139,7 @@ startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDb bracketSyncWithFetchClient fetchClientRegistry srPeerId $ do res <- try $ runConnectedPeersPipelinedWithLimits createConnectedChannels - protocolTracer + nullTracer codecChainSyncId chainSyncNoSizeLimits (timeLimitsChainSync timeouts) @@ -146,18 +159,6 @@ startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDb Right res' -> pure res' pure chainSyncException - where - protocolTracer = Tracer $ \case - (clientOrServer, TraceSendMsg payload) -> - traceUnitWith - tracer - ("Protocol ChainSync " ++ condense srPeerId) - (case clientOrServer of - Client -> "Client -> Server" - Server -> "Server -> Client" - ++ ": " ++ show payload) - _ -> pure () - -- | Start the BlockFetch client, using the supplied 'FetchClientRegistry' to -- register it for synchronization with the ChainSync client. startBlockFetchConnectionThread :: @@ -237,11 +238,12 @@ runScheduler tracer (PointSchedule ps) peers = do runPointSchedule :: forall m. (IOLike m, MonadTime m, MonadTimer m) => + SchedulerConfig -> GenesisTest -> PointSchedule -> Tracer m String -> m (Either (NonEmpty ChainSyncException) TestFragH) -runPointSchedule GenesisTest {gtSecurityParam = k, gtHonestAsc = asc, gtBlockTree} pointSchedule tracer = +runPointSchedule schedulerConfig GenesisTest {gtSecurityParam = k, gtHonestAsc = asc, gtBlockTree} pointSchedule tracer = withRegistry $ \registry -> do resources <- makePeersResources tracer gtBlockTree (pointSchedulePeers pointSchedule) let candidates = srCandidateFragment . prShared <$> resources @@ -250,7 +252,7 @@ runPointSchedule GenesisTest {gtSecurityParam = k, gtHonestAsc = asc, gtBlockTre fetchClientRegistry <- newFetchClientRegistry let chainDbView = defaultChainDbView chainDb chainSyncRess <- for resources $ \PeerResources {prShared, prChainSync} -> do - chainSyncRes <- startChainSyncConnectionThread registry tracer config asc chainDbView fetchClientRegistry prShared prChainSync + chainSyncRes <- startChainSyncConnectionThread registry tracer config asc chainDbView fetchClientRegistry prShared prChainSync schedulerConfig PeerSimulator.BlockFetch.startKeepAliveThread registry fetchClientRegistry (srPeerId prShared) pure chainSyncRes for_ resources $ \PeerResources {prShared} -> diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ScheduledChainSyncServer.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ScheduledChainSyncServer.hs index f7f15ee61e..66e4562777 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ScheduledChainSyncServer.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ScheduledChainSyncServer.hs @@ -1,6 +1,5 @@ -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} -- | A ChainSync protocol server that allows external scheduling of its -- operations, while deferring the implementation of the message handler @@ -15,7 +14,6 @@ module Test.Consensus.PeerSimulator.ScheduledChainSyncServer ( import Control.Tracer (Tracer (Tracer), traceWith) import Data.Foldable (traverse_) -import Data.Functor (void) import Ouroboros.Consensus.Block.Abstract (Point (..)) import Ouroboros.Consensus.Util.Condense (Condense (..)) import Ouroboros.Consensus.Util.IOLike (IOLike, MonadSTM (STM), @@ -35,6 +33,8 @@ data RequestNext = RollForward (Header TestBlock) (Tip TestBlock) | RollBackward (Point TestBlock) (Tip TestBlock) + | + AwaitReply deriving (Eq, Show) -- | Pure representation of the messages produced by the handler for the @StIntersect@ @@ -150,11 +150,25 @@ scheduledChainSyncServer server@ScheduledChainSyncServer {scssHandlers, scssTrac pure $ Left $ SendMsgRollForward header tip go Just (RollBackward point tip) -> do trace "done handling MsgRequestNext" - pure $ Left $ (SendMsgRollBackward point tip) go + pure $ Left $ SendMsgRollBackward point tip go + Just AwaitReply -> do + trace "done handling MsgRequestNext" + pure $ Right $ do -- beginning of the continuation + restart >>= \case + -- If we get 'Right', then we still do not have anything to serve + -- and we loop; what 'Right' contains is the continuation starting + -- at 'do' above; by unwrapping the 'Right', we do not send + -- another AwaitReply message (which Typed Protocols does not + -- allow anyway). + Right cont -> cont + Left msg -> pure msg Nothing -> do trace " cannot serve at this point; waiting for node state and starting again" - void $ awaitNextState server - recvMsgRequestNext + restart + where + -- Yield control back to the scheduler, then wait for the next state and + -- continue processing the client's current 'MsgRequestNext'. + restart = awaitNextState server *> recvMsgRequestNext recvMsgFindIntersect pts = do currentState <- ensureCurrentState server diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs index 1de8b11eb8..e4369a38ae 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs @@ -29,10 +29,9 @@ mkCdbTracer tracer = Tracer $ \case ChainDB.Impl.TraceAddBlockEvent event -> case event of - AddedToCurrentChain _ NewTipInfo {newTipPoint} _ newFragment -> do + AddedToCurrentChain _ NewTipInfo {newTipPoint} _ _ -> do trace "Added to current chain" trace $ "New tip: " ++ condense newTipPoint - trace $ "New fragment: " ++ condense newFragment SwitchedToAFork _ NewTipInfo {newTipPoint} _ newFragment -> do trace "Switched to a fork" trace $ "New tip: " ++ condense newTipPoint