Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send AwaitReply to prevent timeouts. #432

Merged
merged 4 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ runTest genesisTest@GenesisTest {gtBlockTree, gtHonestAsc} schedule makeProperty

mapM_ (traceWith tracer) $ BT.prettyPrint gtBlockTree

result <- runPointSchedule genesisTest schedule tracer
result <- runPointSchedule schedulerConfig genesisTest schedule tracer
trace <- unlines <$> getTrace

let
Expand All @@ -49,3 +49,5 @@ runTest genesisTest@GenesisTest {gtBlockTree, gtHonestAsc} schedule makeProperty
counterexample ("result: " <> condense fragment) (makeProperty fragment)

pure $ counterexample trace prop
where
schedulerConfig = SchedulerConfig {enableTimeouts = False}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

module Test.Consensus.Network.Driver.Limits.Extras (
chainSyncNoSizeLimits
, chainSyncNoTimeouts
, chainSyncTimeouts
, runConnectedPeersPipelinedWithLimits
) where
Expand Down Expand Up @@ -95,3 +96,11 @@ chainSyncTimeouts t f =
mustReplyTimeout = Just $ secondsToDiffTime $ round $
realToFrac (getSlotLength t)
* log (1 - 0.999) / log (1 - ascVal f)

chainSyncNoTimeouts :: ChainSyncTimeout
chainSyncNoTimeouts =
ChainSyncTimeout {
canAwaitTimeout = Nothing
, intersectTimeout = Nothing
, mustReplyTimeout = Nothing
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import qualified Test.Consensus.BlockTree as BT
import Test.Consensus.BlockTree (BlockTree)
import Test.Consensus.PeerSimulator.ScheduledChainSyncServer
(FindIntersect (..),
RequestNext (RollBackward, RollForward))
RequestNext (AwaitReply, RollBackward, RollForward))
import Test.Consensus.PointSchedule (AdvertisedPoints (header, tip),
HeaderPoint (HeaderPoint), TipPoint (TipPoint))
import Test.Util.Orphans.IOLike ()
Expand All @@ -52,7 +52,7 @@ handlerFindIntersection ::
STM m (FindIntersect, [String])
handlerFindIntersection currentIntersection blockTree points clientPoints = do
let TipPoint tip' = tip points
tipPoint = Ouroboros.Network.Block.getTipPoint tip'
tipPoint = getTipPoint tip'
fragment = fromJust $ BT.findFragment tipPoint blockTree
case intersectWith fragment clientPoints of
Nothing ->
Expand Down Expand Up @@ -81,16 +81,20 @@ handlerRequestNext currentIntersection blockTree points =
runWriterT $ do
intersection <- lift $ readTVar currentIntersection
trace $ " last intersection is " ++ condense intersection
let HeaderPoint header' = header points
headerPoint = AF.castPoint $ blockPoint header'
maybe noPathError analysePath (BT.findPath intersection headerPoint blockTree)
where
noPathError = error "serveHeader: intersection and and headerPoint should always be in the block tree"

analysePath = \case
-- If the anchor is the intersection (the source of the path-finding)
-- but the fragment is empty, then the intersection is exactly our
-- header point and there is nothing to do.
-- If the anchor is the intersection (the source of the path-finding) but
-- the fragment is empty, then the intersection is exactly our header
-- point and there is nothing to do. If additionally the header point is
-- also the tip point (because we served our whole chain, or we are
-- stalling as an adversarial behaviour), then we ask the client to wait;
-- otherwise we just do nothing.
(BT.PathAnchoredAtSource True, AF.Empty _) | getTipPoint tip' == headerPoint -> do
trace " chain has been fully served"
pure (Just AwaitReply)
(BT.PathAnchoredAtSource True, AF.Empty _) -> do
trace " intersection is exactly our header point"
pure Nothing
Expand All @@ -105,16 +109,28 @@ handlerRequestNext currentIntersection blockTree points =
-- the intersection is further than the tip that we can serve.
(BT.PathAnchoredAtSource False, AF.Empty _) -> do
trace " intersection is further than our header point"
-- REVIEW: The following is a hack that allows the honest peer to not
-- get disconnected when it falls behind. Why does a peer doing that not
-- get disconnected from?
--
-- We decided to hold off on making this work with timeouts, so we'll return
-- Nothing here for now.
-- The consequence of this is that a slow peer will just block until it reaches
-- the fork intersection in its schedule.
-- pure (Just AwaitReply)
pure Nothing
-- If the anchor is not the intersection and the fragment is non-empty,
-- then we require a rollback
(BT.PathAnchoredAtSource False, fragment) -> do
trace $ " we will require a rollback to" ++ condense (AF.anchorPoint fragment)
trace $ " fragment: " ++ condense fragment
let
tip' = coerce (tip points)
point = AF.anchorPoint fragment
lift $ writeTVar currentIntersection point
pure $ Just (RollBackward point tip')

HeaderPoint header' = header points
headerPoint = AF.castPoint $ blockPoint header'
TipPoint tip' = tip points

trace = tell . pure
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

module Test.Consensus.PeerSimulator.Run (
ChainSyncException (..)
, SchedulerConfig (..)
, runPointSchedule
) where

import Control.Monad.Class.MonadAsync
(AsyncCancelled (AsyncCancelled))
import Control.Monad.Class.MonadTime (MonadTime)
import Control.Monad.Class.MonadTimer.SI (MonadTimer)
import Control.Tracer (Tracer (Tracer), nullTracer, traceWith)
import Control.Tracer (Tracer, nullTracer, traceWith)
import Data.Foldable (for_)
import Data.Functor (void)
import Data.List.NonEmpty (NonEmpty, nonEmpty)
Expand Down Expand Up @@ -43,7 +44,6 @@ import Ouroboros.Network.Channel (createConnectedChannels)
import Ouroboros.Network.ControlMessage (ControlMessage (..),
ControlMessageSTM)
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.Driver.Simple (Role (Client, Server))
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
(ChainSyncClientPipelined, chainSyncClientPeerPipelined)
import Ouroboros.Network.Protocol.ChainSync.Codec
Expand All @@ -67,6 +67,17 @@ import Test.Util.ChainDB
import Test.Util.Orphans.IOLike ()
import Test.Util.TestBlock (Header (..), TestBlock, testInitExtLedger)

-- | Behavior config for the scheduler.
data SchedulerConfig =
SchedulerConfig {
-- | Whether to use timouts for the ChainSync protocol.
-- These apply when the client sends a MsgRequestNext and the server doesn't reply.
-- Because the point schedule cannot yet handle the case where a slow peer has a
-- header point that's behind the latest header that another peer has sent, we need
-- to be able to disable them.
enableTimeouts :: Bool
}
deriving (Show)

basicChainSyncClient ::
IOLike m =>
Expand Down Expand Up @@ -112,11 +123,13 @@ startChainSyncConnectionThread ::
FetchClientRegistry PeerId (Header TestBlock) TestBlock m ->
SharedResources m ->
ChainSyncResources m ->
SchedulerConfig ->
m (StrictTVar m (Maybe ChainSyncException))
startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDbView fetchClientRegistry SharedResources {srPeerId, srCandidateFragment} ChainSyncResources {csrServer} = do
startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDbView fetchClientRegistry SharedResources {srPeerId, srCandidateFragment} ChainSyncResources {csrServer} SchedulerConfig {enableTimeouts} = do
let
slotLength = HardFork.eraSlotLength . topLevelConfigLedger $ cfg
timeouts = chainSyncTimeouts slotLength activeSlotCoefficient
timeouts | enableTimeouts = chainSyncTimeouts slotLength activeSlotCoefficient
| otherwise = chainSyncNoTimeouts
traceWith tracer $ "timeouts:"
traceWith tracer $ " canAwait = " ++ show (canAwaitTimeout timeouts)
traceWith tracer $ " intersect = " ++ show (intersectTimeout timeouts)
Expand All @@ -126,7 +139,7 @@ startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDb
bracketSyncWithFetchClient fetchClientRegistry srPeerId $ do
res <- try $ runConnectedPeersPipelinedWithLimits
createConnectedChannels
protocolTracer
nullTracer
codecChainSyncId
chainSyncNoSizeLimits
(timeLimitsChainSync timeouts)
Expand All @@ -146,18 +159,6 @@ startChainSyncConnectionThread registry tracer cfg activeSlotCoefficient chainDb
Right res' -> pure res'
pure chainSyncException

where
protocolTracer = Tracer $ \case
(clientOrServer, TraceSendMsg payload) ->
traceUnitWith
tracer
("Protocol ChainSync " ++ condense srPeerId)
(case clientOrServer of
Client -> "Client -> Server"
Server -> "Server -> Client"
++ ": " ++ show payload)
_ -> pure ()

-- | Start the BlockFetch client, using the supplied 'FetchClientRegistry' to
-- register it for synchronization with the ChainSync client.
startBlockFetchConnectionThread ::
Expand Down Expand Up @@ -237,11 +238,12 @@ runScheduler tracer (PointSchedule ps) peers = do
runPointSchedule ::
forall m.
(IOLike m, MonadTime m, MonadTimer m) =>
SchedulerConfig ->
GenesisTest ->
PointSchedule ->
Tracer m String ->
m (Either (NonEmpty ChainSyncException) TestFragH)
runPointSchedule GenesisTest {gtSecurityParam = k, gtHonestAsc = asc, gtBlockTree} pointSchedule tracer =
runPointSchedule schedulerConfig GenesisTest {gtSecurityParam = k, gtHonestAsc = asc, gtBlockTree} pointSchedule tracer =
withRegistry $ \registry -> do
resources <- makePeersResources tracer gtBlockTree (pointSchedulePeers pointSchedule)
let candidates = srCandidateFragment . prShared <$> resources
Expand All @@ -250,7 +252,7 @@ runPointSchedule GenesisTest {gtSecurityParam = k, gtHonestAsc = asc, gtBlockTre
fetchClientRegistry <- newFetchClientRegistry
let chainDbView = defaultChainDbView chainDb
chainSyncRess <- for resources $ \PeerResources {prShared, prChainSync} -> do
chainSyncRes <- startChainSyncConnectionThread registry tracer config asc chainDbView fetchClientRegistry prShared prChainSync
chainSyncRes <- startChainSyncConnectionThread registry tracer config asc chainDbView fetchClientRegistry prShared prChainSync schedulerConfig
PeerSimulator.BlockFetch.startKeepAliveThread registry fetchClientRegistry (srPeerId prShared)
pure chainSyncRes
for_ resources $ \PeerResources {prShared} ->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}

-- | A ChainSync protocol server that allows external scheduling of its
-- operations, while deferring the implementation of the message handler
Expand All @@ -15,7 +14,6 @@ module Test.Consensus.PeerSimulator.ScheduledChainSyncServer (

import Control.Tracer (Tracer (Tracer), traceWith)
import Data.Foldable (traverse_)
import Data.Functor (void)
import Ouroboros.Consensus.Block.Abstract (Point (..))
import Ouroboros.Consensus.Util.Condense (Condense (..))
import Ouroboros.Consensus.Util.IOLike (IOLike, MonadSTM (STM),
Expand All @@ -35,6 +33,8 @@ data RequestNext =
RollForward (Header TestBlock) (Tip TestBlock)
|
RollBackward (Point TestBlock) (Tip TestBlock)
|
AwaitReply
deriving (Eq, Show)

-- | Pure representation of the messages produced by the handler for the @StIntersect@
Expand Down Expand Up @@ -150,11 +150,25 @@ scheduledChainSyncServer server@ScheduledChainSyncServer {scssHandlers, scssTrac
pure $ Left $ SendMsgRollForward header tip go
Just (RollBackward point tip) -> do
trace "done handling MsgRequestNext"
pure $ Left $ (SendMsgRollBackward point tip) go
pure $ Left $ SendMsgRollBackward point tip go
Just AwaitReply -> do
trace "done handling MsgRequestNext"
pure $ Right $ do -- beginning of the continuation
restart >>= \case
-- If we get 'Right', then we still do not have anything to serve
-- and we loop; what 'Right' contains is the continuation starting
-- at 'do' above; by unwrapping the 'Right', we do not send
-- another AwaitReply message (which Typed Protocols does not
-- allow anyway).
Right cont -> cont
Left msg -> pure msg
Nothing -> do
trace " cannot serve at this point; waiting for node state and starting again"
void $ awaitNextState server
recvMsgRequestNext
restart
where
-- Yield control back to the scheduler, then wait for the next state and
-- continue processing the client's current 'MsgRequestNext'.
restart = awaitNextState server *> recvMsgRequestNext

recvMsgFindIntersect pts = do
currentState <- ensureCurrentState server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ mkCdbTracer tracer =
Tracer $ \case
ChainDB.Impl.TraceAddBlockEvent event ->
case event of
AddedToCurrentChain _ NewTipInfo {newTipPoint} _ newFragment -> do
AddedToCurrentChain _ NewTipInfo {newTipPoint} _ _ -> do
trace "Added to current chain"
trace $ "New tip: " ++ condense newTipPoint
trace $ "New fragment: " ++ condense newFragment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change intended?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that was too noisy and mostly redundant

SwitchedToAFork _ NewTipInfo {newTipPoint} _ newFragment -> do
trace "Switched to a fork"
trace $ "New tip: " ++ condense newTipPoint
Expand Down