From 83b12eec280aac82b50ed0d1214f4751c1f4e70f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20=E2=80=9CNiols=E2=80=9D=20Jeannerod?= Date: Tue, 19 Mar 2024 10:51:12 +0000 Subject: [PATCH] Make the LoP follow the GSM states This also involves some fairly important changes to the leaky bucket and to the leaky bucket API. These changes make the leaky bucket significantly more robust. Co-authored-by: Alexander Esgen --- .../Ouroboros/Consensus/Network/NodeToNode.hs | 3 +- .../Ouroboros/Consensus/Node.hs | 2 +- .../Ouroboros/Consensus/Node/GSM.hs | 34 +- .../Ouroboros/Consensus/NodeKernel.hs | 28 +- .../Test/Consensus/PeerSimulator/ChainSync.hs | 4 +- ouroboros-consensus/ouroboros-consensus.cabal | 2 + .../MiniProtocol/ChainSync/Client.hs | 73 +++- .../MiniProtocol/ChainSync/Client/State.hs | 17 +- .../Ouroboros/Consensus/Node/GsmState.hs | 23 + .../Ouroboros/Consensus/Util/LeakyBucket.hs | 409 +++++++++++++----- .../MiniProtocol/ChainSync/Client.hs | 7 +- .../Consensus/Util/LeakyBucket/Tests.hs | 248 +++++++---- 12 files changed, 575 insertions(+), 275 deletions(-) create mode 100644 ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/GsmState.hs diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index 4dbf6a9361..abb195eb65 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -41,7 +41,7 @@ import qualified Codec.CBOR.Decoding as CBOR import Codec.CBOR.Encoding (Encoding) import qualified Codec.CBOR.Encoding as CBOR import Codec.CBOR.Read (DeserialiseFailure) -import Control.Concurrent.Class.MonadSTM.Strict.TVar as TVar.Unchecked +import qualified Control.Concurrent.Class.MonadSTM.Strict.TVar as TVar.Unchecked import Control.Monad.Class.MonadTime.SI (MonadTime) import Control.Monad.Class.MonadTimer.SI (MonadTimer) import Control.Tracer @@ -570,6 +570,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke (contramap (TraceLabelPeer them) (Node.chainSyncClientTracer (getTracers kernel))) (CsClient.defaultChainDbView (getChainDB kernel)) (getChainSyncHandles kernel) + (getGsmState kernel) them version lopBucketConfig diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index b2e0609579..9fc7c83b3c 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -629,7 +629,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = LedgerPeersConsensusInterface { lpGetLatestSlot = getImmTipSlot kernel, lpGetLedgerPeers = fromMaybe [] <$> getPeersFromCurrentLedger kernel (const True), - lpGetLedgerStateJudgement = getLedgerStateJudgement kernel + lpGetLedgerStateJudgement = GSM.gsmStateToLedgerJudgement <$> getGsmState kernel }, Diffusion.daUpdateOutboundConnectionsState = let varOcs = getOutboundConnectionsState kernel in \newOcs -> do diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs index 41cb3aaf24..8c420f370d 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/GSM.hs @@ -19,11 +19,13 @@ module Ouroboros.Consensus.Node.GSM ( -- * Auxiliaries , TraceGsmEvent (..) , gsmStateToLedgerJudgement - , initializationLedgerJudgement + , initializationGsmState -- * Constructors , realDurationUntilTooOld , realGsmEntryPoints , realMarkerFileView + -- * Re-exported + , module Ouroboros.Consensus.Node.GsmState ) where import qualified Cardano.Slotting.Slot as Slot @@ -43,6 +45,7 @@ import qualified Ouroboros.Consensus.HardFork.Abstract as HardFork import qualified Ouroboros.Consensus.HardFork.History as HardFork import qualified Ouroboros.Consensus.HardFork.History.Qry as Qry import qualified Ouroboros.Consensus.Ledger.Basics as L +import Ouroboros.Consensus.Node.GsmState import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) import Ouroboros.Consensus.Util.NormalForm.StrictTVar (StrictTVar) import qualified Ouroboros.Consensus.Util.NormalForm.StrictTVar as StrictSTM @@ -80,19 +83,6 @@ data CandidateVersusSelection = -- ^ Whether the candidate is better than the selection deriving (Eq, Show) --- | Current state of the Genesis State Machine -data GsmState = - PreSyncing - -- ^ We are syncing, and the Honest Availability Assumption is not - -- satisfied. - | - Syncing - -- ^ We are syncing, and the Honest Availability Assumption is satisfied. - | - CaughtUp - -- ^ We are caught-up. - deriving (Eq, Show, Read) - data GsmView m upstreamPeer selection chainSyncState = GsmView { antiThunderingHerd :: Maybe StdGen -- ^ An initial seed used to randomly increase 'minCaughtUpDuration' by up @@ -168,10 +158,10 @@ data GsmEntryPoints m = GsmEntryPoints { ----- --- | Determine the initial 'LedgerStateJudgment' +-- | Determine the initial 'GsmState' -- -- Also initializes the persistent marker file. -initializationLedgerJudgement :: +initializationGsmState :: ( L.GetTip (L.LedgerState blk) , Monad m ) @@ -179,23 +169,23 @@ initializationLedgerJudgement :: -> Maybe (WrapDurationUntilTooOld m blk) -- ^ 'Nothing' if @blk@ has no age limit -> MarkerFileView m - -> m LedgerStateJudgement -initializationLedgerJudgement + -> m GsmState +initializationGsmState getCurrentLedger mbDurationUntilTooOld markerFileView = do wasCaughtUp <- hasMarkerFile markerFileView - if not wasCaughtUp then pure TooOld else do + if not wasCaughtUp then pure PreSyncing else do case mbDurationUntilTooOld of - Nothing -> return YoungEnough + Nothing -> return CaughtUp Just wd -> do sno <- L.getTipSlot <$> getCurrentLedger getDurationUntilTooOld wd sno >>= \case - After{} -> return YoungEnough + After{} -> return CaughtUp Already -> do removeMarkerFile markerFileView - return TooOld + return PreSyncing -- | For 'LedgerStateJudgement' as used in the Diffusion layer, there is no -- difference between 'PreSyncing' and 'Syncing'. diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index 94d42910a3..b4dd556373 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -80,6 +80,8 @@ import Ouroboros.Consensus.Util.AnchoredFragment (preferAnchoredCandidate) import Ouroboros.Consensus.Util.EarlyExit import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.LeakyBucket + (atomicallyWithMonotonicTime) import Ouroboros.Consensus.Util.Orphans () import Ouroboros.Consensus.Util.ResourceRegistry import Ouroboros.Consensus.Util.STM @@ -130,9 +132,10 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel { -- , getFetchMode :: STM m FetchMode - -- | The ledger judgement, used by diffusion. + -- | The GSM state, used by diffusion. A ledger judgement can be derived + -- from it with 'GSM.gsmStateToLedgerJudgement'. -- - , getLedgerStateJudgement :: STM m LedgerStateJudgement + , getGsmState :: STM m GSM.GsmState -- | The kill handle and exposed state for each ChainSync client. , getChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk)) @@ -206,7 +209,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers , mempool , peerSharingRegistry , varChainSyncHandles - , varLedgerJudgement + , varGsmState } = st varOutboundConnectionsState <- newTVarIO UntrustedState @@ -244,8 +247,11 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers , GSM.setCaughtUpPersistentMark = \upd -> (if upd then GSM.touchMarkerFile else GSM.removeMarkerFile) gsmMarkerFileView - , GSM.writeGsmState = \x -> atomically $ do - writeTVar varLedgerJudgement $ GSM.gsmStateToLedgerJudgement x + , GSM.writeGsmState = \gsmState -> + atomicallyWithMonotonicTime $ \time -> do + writeTVar varGsmState gsmState + handles <- readTVar varChainSyncHandles + traverse_ (($ time) . ($ gsmState) . cschGsmCallback) handles , GSM.isHaaSatisfied = do readTVar varOutboundConnectionsState <&> \case -- See the upstream Haddocks for the exact conditions under @@ -253,7 +259,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers TrustedStateWithExternalPeers -> True UntrustedState -> False } - judgment <- readTVarIO varLedgerJudgement + judgment <- GSM.gsmStateToLedgerJudgement <$> readTVarIO varGsmState void $ forkLinkedThread registry "NodeKernel.GSM" $ case judgment of TooOld -> GSM.enterPreSyncing gsm YoungEnough -> GSM.enterCaughtUp gsm @@ -282,7 +288,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers , getTopLevelConfig = cfg , getFetchClientRegistry = fetchClientRegistry , getFetchMode = readFetchMode blockFetchInterface - , getLedgerStateJudgement = readTVar varLedgerJudgement + , getGsmState = readTVar varGsmState , getChainSyncHandles = varChainSyncHandles , getPeerSharingRegistry = peerSharingRegistry , getTracers = tracers @@ -317,9 +323,9 @@ data InternalState m addrNTN addrNTC blk = IS { , blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m , fetchClientRegistry :: FetchClientRegistry (ConnectionId addrNTN) (Header blk) blk m , varChainSyncHandles :: StrictTVar m (Map (ConnectionId addrNTN) (ChainSyncClientHandle m blk)) + , varGsmState :: StrictTVar m GSM.GsmState , mempool :: Mempool m blk , peerSharingRegistry :: PeerSharingRegistry addrNTN m - , varLedgerJudgement :: StrictTVar m LedgerStateJudgement } initInternalState :: @@ -336,9 +342,9 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg , mempoolCapacityOverride , gsmArgs, getUseBootstrapPeers } = do - varLedgerJudgement <- do + varGsmState <- do let GsmNodeKernelArgs {..} = gsmArgs - j <- GSM.initializationLedgerJudgement + j <- GSM.initializationGsmState (atomically $ ledgerState <$> ChainDB.getCurrentLedger chainDB) gsmDurationUntilTooOld gsmMarkerFileView @@ -362,7 +368,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg btime (ChainDB.getCurrentChain chainDB) getUseBootstrapPeers - (readTVar varLedgerJudgement) + (GSM.gsmStateToLedgerJudgement <$> readTVar varGsmState) blockFetchInterface :: BlockFetchConsensusInterface (ConnectionId addrNTN) (Header blk) blk m blockFetchInterface = BlockFetchClientInterface.mkBlockFetchConsensusInterface (configBlock cfg) diff --git a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ChainSync.hs b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ChainSync.hs index 21894f8811..2197bea732 100644 --- a/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ChainSync.hs +++ b/ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/ChainSync.hs @@ -26,6 +26,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client Consensus, bracketChainSyncClient, chainSyncClient) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck +import Ouroboros.Consensus.Node.GsmState (GsmState (Syncing)) import Ouroboros.Consensus.Util (ShowProxy) import Ouroboros.Consensus.Util.IOLike (Exception (fromException), IOLike, MonadCatch (try), StrictTVar) @@ -139,11 +140,12 @@ runChainSyncClient csjConfig StateViewTracers {svtPeerSimulatorResultsTracer} varHandles - channel = do + channel = bracketChainSyncClient nullTracer chainDbView varHandles + (pure Syncing) peerId (maxBound :: NodeToNodeVersion) lopBucketConfig diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 93a5b01352..24affe240a 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -175,6 +175,7 @@ library Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server + Ouroboros.Consensus.Node.GsmState Ouroboros.Consensus.Node.InitStorage Ouroboros.Consensus.Node.NetworkProtocolVersion Ouroboros.Consensus.Node.ProtocolInfo @@ -590,6 +591,7 @@ test-suite infra-test build-depends: QuickCheck, base, + io-classes, io-sim, mtl, ouroboros-consensus:{ouroboros-consensus, unstable-consensus-testlib}, diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 6d23553141..8a08bff1d6 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -74,6 +74,7 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client ( ) where import Control.Monad (join, void) +import Control.Monad.Class.MonadTimer (MonadTimer) import Control.Monad.Except (runExcept, throwError) import Control.Tracer import Data.Kind (Type) @@ -102,6 +103,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as Jumping import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State +import Ouroboros.Consensus.Node.GsmState (GsmState (..)) import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Protocol.Abstract import Ouroboros.Consensus.Storage.ChainDB (ChainDB, @@ -113,6 +115,8 @@ import Ouroboros.Consensus.Util.Assert (assertWithMsg) import Ouroboros.Consensus.Util.EarlyExit (WithEarlyExit, exitEarly) import qualified Ouroboros.Consensus.Util.EarlyExit as EarlyExit import Ouroboros.Consensus.Util.IOLike hiding (handle) +import Ouroboros.Consensus.Util.LeakyBucket + (atomicallyWithMonotonicTime) import qualified Ouroboros.Consensus.Util.LeakyBucket as LeakyBucket import Ouroboros.Consensus.Util.STM (Fingerprint, Watcher (..), WithFingerprint (..), withWatcher) @@ -313,16 +317,20 @@ deriving anyclass instance ( NoThunks (Header blk) ) => NoThunks (ChainSyncStateView m blk) -bracketChainSyncClient :: forall m peer blk a. +bracketChainSyncClient :: + forall m peer blk a. ( IOLike m , Ord peer , LedgerSupportsProtocol blk + , MonadTimer m ) => Tracer m (TraceChainSyncClientEvent blk) -> ChainDbView m blk -> StrictTVar m (Map peer (ChainSyncClientHandle m blk)) -- ^ The kill handle and states for each peer, we need the whole map because we -- (de)register nodes (@peer@). + -> STM m GsmState + -- ^ A function giving the current GSM state; only used at startup. -> peer -> NodeToNodeVersion -> ChainSyncLoPBucketConfig @@ -333,19 +341,21 @@ bracketChainSyncClient tracer ChainDbView { getIsInvalidBlock } varHandles + getGsmState peer version csBucketConfig csjConfig body - = mkChainSyncClientHandleState >>= \csHandleState -> - withCSJCallbacks csHandleState csjConfig $ \csjCallbacks -> + = + LeakyBucket.execAgainstBucket' + $ \lopBucket -> + mkChainSyncClientHandleState >>= \csHandleState -> + withCSJCallbacks lopBucket csHandleState csjConfig $ \csjCallbacks -> withWatcher "ChainSync.Client.rejectInvalidBlocks" (invalidBlockWatcher csHandleState) - $ LeakyBucket.execAgainstBucket lopBucketConfig - $ \lopBucket -> - body ChainSyncStateView { + $ body ChainSyncStateView { csvSetCandidate = modifyTVar csHandleState . \ c s -> s {csCandidate = c} , csvSetLatestSlot = @@ -355,9 +365,9 @@ bracketChainSyncClient , idlingStop = atomically $ modifyTVar csHandleState $ \ s -> s {csIdling = False} } , csvLoPBucket = LoPBucket { - lbPause = LeakyBucket.setPaused lopBucket True - , lbResume = LeakyBucket.setPaused lopBucket False - , lbGrantToken = void $ LeakyBucket.fill lopBucket 1 + lbPause = LeakyBucket.setPaused' lopBucket True + , lbResume = LeakyBucket.setPaused' lopBucket False + , lbGrantToken = void $ LeakyBucket.fill' lopBucket 1 } , csvJumping = csjCallbacks } @@ -370,34 +380,43 @@ bracketChainSyncClient } withCSJCallbacks :: + LeakyBucket.Handlers m -> StrictTVar m (ChainSyncState blk) -> CSJConfig -> (Jumping.Jumping m blk -> m a) -> m a - withCSJCallbacks cschState CSJDisabled f = do + withCSJCallbacks lopBucket cschState CSJDisabled f = do tid <- myThreadId cschJumpInfo <- newTVarIO Nothing cschJumping <- newTVarIO (Disengaged DisengagedDone) let handle = ChainSyncClientHandle { cschGDDKill = throwTo tid DensityTooLow + , cschGsmCallback = updateLopBucketConfig lopBucket , cschState , cschJumping , cschJumpInfo } - insertHandle = atomically $ modifyTVar varHandles $ Map.insert peer handle + insertHandle = atomicallyWithMonotonicTime $ \time -> do + initialGsmState <- getGsmState + updateLopBucketConfig lopBucket initialGsmState time + modifyTVar varHandles $ Map.insert peer handle deleteHandle = atomically $ modifyTVar varHandles $ Map.delete peer bracket_ insertHandle deleteHandle $ f Jumping.noJumping - withCSJCallbacks csHandleState (CSJEnabled csjEnabledConfig) f = - bracket (acquireContext csHandleState csjEnabledConfig) releaseContext $ \peerContext -> + withCSJCallbacks lopBucket csHandleState (CSJEnabled csjEnabledConfig) f = + bracket (acquireContext lopBucket csHandleState csjEnabledConfig) releaseContext $ \peerContext -> f $ Jumping.mkJumping peerContext - acquireContext cschState (CSJEnabledConfig jumpSize) = do + + acquireContext lopBucket cschState (CSJEnabledConfig jumpSize) = do tid <- myThreadId - atomically $ do + atomicallyWithMonotonicTime $ \time -> do + initialGsmState <- getGsmState + updateLopBucketConfig lopBucket initialGsmState time cschJumpInfo <- newTVar Nothing context <- Jumping.makeContext varHandles jumpSize Jumping.registerClient context peer cschState $ \cschJumping -> ChainSyncClientHandle { cschGDDKill = throwTo tid DensityTooLow + , cschGsmCallback = updateLopBucketConfig lopBucket , cschState , cschJumping , cschJumpInfo @@ -409,19 +428,33 @@ bracketChainSyncClient invalidBlockRejector tracer version getIsInvalidBlock (csCandidate <$> readTVar varState) + -- | Update the configuration of the bucket to match the given GSM state. + -- NOTE: The new level is currently the maximal capacity of the bucket; + -- maybe we want to change that later. + updateLopBucketConfig :: LeakyBucket.Handlers m -> GsmState -> Time -> STM m () + updateLopBucketConfig lopBucket gsmState = + LeakyBucket.updateConfig lopBucket $ \_ -> + let config = lopBucketConfig gsmState in + (LeakyBucket.capacity config, config) + -- | Wrapper around 'LeakyBucket.execAgainstBucket' that handles the -- disabled bucket by running the given action with dummy handlers. - lopBucketConfig :: LeakyBucket.Config m - lopBucketConfig = - case csBucketConfig of - ChainSyncLoPBucketDisabled -> LeakyBucket.dummyConfig - ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig {csbcCapacity, csbcRate} -> + lopBucketConfig :: GsmState -> LeakyBucket.Config m + lopBucketConfig gsmState = + case (gsmState, csBucketConfig) of + (Syncing, ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig {csbcCapacity, csbcRate}) -> LeakyBucket.Config { capacity = fromInteger $ csbcCapacity, rate = csbcRate, onEmpty = throwIO EmptyBucket, fillOnOverflow = True } + -- NOTE: If we decide to slow the bucket down when “almost caught-up”, + -- we should add a state to the GSM and corresponding configuration + -- fields and a bucket config here. + (_, ChainSyncLoPBucketDisabled) -> LeakyBucket.dummyConfig + (PreSyncing, ChainSyncLoPBucketEnabled _) -> LeakyBucket.dummyConfig + (CaughtUp, ChainSyncLoPBucketEnabled _) -> LeakyBucket.dummyConfig -- Our task: after connecting to an upstream node, try to maintain an -- up-to-date header-only fragment representing their chain. We maintain diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs index 189ee361e5..c75e3d3530 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/State.hs @@ -27,8 +27,9 @@ import Ouroboros.Consensus.Block (HasHeader, Header, Point) import Ouroboros.Consensus.HeaderStateHistory (HeaderStateHistory) import Ouroboros.Consensus.Ledger.SupportsProtocol (LedgerSupportsProtocol) -import Ouroboros.Consensus.Util.IOLike (IOLike, NoThunks (..), - StrictTVar) +import Ouroboros.Consensus.Node.GsmState (GsmState) +import Ouroboros.Consensus.Util.IOLike (IOLike, NoThunks (..), STM, + StrictTVar, Time) import Ouroboros.Network.AnchoredFragment (AnchoredFragment, headPoint) @@ -69,17 +70,21 @@ deriving anyclass instance ( -- the GDD governor. data ChainSyncClientHandle m blk = ChainSyncClientHandle { -- | Disconnects from the peer when the GDD considers it adversarial - cschGDDKill :: !(m ()) + cschGDDKill :: !(m ()) + + -- | Callback called by the GSM when the GSM state changes. They take the + -- current time and should execute rapidly. Used to enable/disable the LoP. + , cschGsmCallback :: !(GsmState -> Time -> STM m ()) -- | Data shared between the client and external components like GDD. - , cschState :: !(StrictTVar m (ChainSyncState blk)) + , cschState :: !(StrictTVar m (ChainSyncState blk)) -- | The state of the peer with respect to ChainSync jumping. - , cschJumping :: !(StrictTVar m (ChainSyncJumpingState m blk)) + , cschJumping :: !(StrictTVar m (ChainSyncJumpingState m blk)) -- | ChainSync state needed to jump to the tip of the candidate fragment of -- the peer. - , cschJumpInfo :: !(StrictTVar m (Maybe (JumpInfo blk))) + , cschJumpInfo :: !(StrictTVar m (Maybe (JumpInfo blk))) } deriving stock (Generic) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/GsmState.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/GsmState.hs new file mode 100644 index 0000000000..1e3405645b --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/GsmState.hs @@ -0,0 +1,23 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} + +-- | This module contains the definition of a state in the Genesis State Machine +-- (GSM). The GSM itself is defined in 'ouroboros-consensus-diffusion', but the +-- ChainSync client relies on its state. +module Ouroboros.Consensus.Node.GsmState (GsmState (..)) where + +import GHC.Generics (Generic) +import NoThunks.Class (NoThunks) + +-- | Current state of the Genesis State Machine +data GsmState = + PreSyncing + -- ^ We are syncing, and the Honest Availability Assumption is not + -- satisfied. + | + Syncing + -- ^ We are syncing, and the Honest Availability Assumption is satisfied. + | + CaughtUp + -- ^ We are caught-up. + deriving (Eq, Show, Read, Generic, NoThunks) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs index 41b575c128..b4951bb7d3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs @@ -30,18 +30,30 @@ module Ouroboros.Consensus.Util.LeakyBucket ( Config (..) , Handlers (..) , State (..) + , atomicallyWithMonotonicTime , diffTimeToSecondsRational , dummyConfig , evalAgainstBucket , execAgainstBucket + , execAgainstBucket' + , fill' + , microsecondsPerSecond + , picosecondsPerSecond , runAgainstBucket , secondsRationalToDiffTime + , setPaused' + , updateConfig' ) where +import Control.Monad (void, when) +import qualified Control.Monad.Class.MonadSTM.Internal as TVar +import Control.Monad.Class.MonadTimer (MonadTimer, registerDelay) +import Control.Monad.Class.MonadTimer.SI (diffTimeToMicrosecondsAsInt) import Data.Ratio ((%)) import Data.Time.Clock (diffTimeToPicoseconds) import GHC.Generics (Generic) -import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Consensus.Util.IOLike hiding (killThread) +import Ouroboros.Consensus.Util.STM (blockUntilChanged) import Prelude hiding (init) -- | Configuration of a leaky bucket. @@ -57,7 +69,7 @@ data Config m = Config } deriving (Generic) -deriving instance NoThunks (m ()) => NoThunks (Config m) +deriving instance (NoThunks (m ())) => NoThunks (Config m) -- | A configuration for a bucket that does nothing. dummyConfig :: (Applicative m) => Config m @@ -70,118 +82,197 @@ dummyConfig = } -- | State of a leaky bucket, giving the level and the associated time. -data State cfg = State - { level :: !Rational, - time :: !Time, - paused :: !Bool, - config :: !cfg +data State m = State + { level :: !Rational, + time :: !Time, + paused :: !Bool, + configGeneration :: !Int, + config :: !(Config m) } - deriving (Eq, Show, Generic, NoThunks) + deriving (Generic) + +deriving instance (NoThunks (m ())) => NoThunks (State m) --- | A bucket is simply a TVar of a state. -type Bucket m = StrictTVar m (State (Config m)) +-- | A bucket is simply a TVar of a state. The state carries a 'Config' and an +-- integer (a “generation”) to detect changes in the configuration. +type Bucket m = StrictTVar m (State m) -- | Whether filling the bucket overflew. data FillResult = Overflew | DidNotOverflow -- | The handlers to a bucket: contains the API to interact with a running --- bucket. +-- bucket. All the endpoints are STM but require the current time; the easy way +-- to provide this being 'atomicallyWithMonotonicTime'. data Handlers m = Handlers { -- | Refill the bucket by the given amount and returns whether the bucket -- overflew. The bucket may silently get filled to full capacity or not get -- filled depending on 'fillOnOverflow'. - fill :: !(Rational -> m FillResult), + fill :: + !( Rational -> + Time -> + STM m FillResult + ), -- | Pause or resume the bucket. Pausing stops the bucket from leaking until -- it is resumed. It is still possible to fill it during that time. @setPaused -- True@ and @setPaused False@ are idempotent. - setPaused :: !(Bool -> m ()), - -- | Dynamically update the configuration of the bucket. - updateConfig :: !((Config m -> Config m) -> m ()) + setPaused :: + !( Bool -> + Time -> + STM m () + ), + -- | Dynamically update the level and configuration of the bucket. Updating + -- the level matters if the capacity changes, in particular. If updating + -- leave the bucket empty, the action is triggered immediately. + updateConfig :: + !( ((Rational, Config m) -> (Rational, Config m)) -> + Time -> + STM m () + ) } +-- | Variant of 'fill' already wrapped in 'atomicallyWithMonotonicTime'. +fill' :: + ( MonadMonotonicTime m, + MonadSTM m + ) => + Handlers m -> + Rational -> + m FillResult +fill' h r = atomicallyWithMonotonicTime $ fill h r + +-- | Variant of 'setPaused' already wrapped in 'atomicallyWithMonotonicTime'. +setPaused' :: + ( MonadMonotonicTime m, + MonadSTM m + ) => + Handlers m -> + Bool -> + m () +setPaused' h p = atomicallyWithMonotonicTime $ setPaused h p + +-- | Variant of 'updateConfig' already wrapped in 'atomicallyWithMonotonicTime'. +updateConfig' :: + ( MonadMonotonicTime m, + MonadSTM m + ) => + Handlers m -> + ((Rational, Config m) -> (Rational, Config m)) -> + m () +updateConfig' h f = atomicallyWithMonotonicTime $ updateConfig h f + -- | Create a bucket with the given configuration, then run the action against -- that bucket. Returns when the action terminates or the bucket empties. In the -- first case, return the value returned by the action. In the second case, -- return @Nothing@. execAgainstBucket :: - (MonadDelay m, MonadAsync m, MonadFork m, MonadMask m, NoThunks (m ())) => + ( MonadDelay m, + MonadAsync m, + MonadFork m, + MonadMask m, + MonadTimer m, + NoThunks (m ()) + ) => Config m -> (Handlers m -> m a) -> m a execAgainstBucket config action = snd <$> runAgainstBucket config action +-- | Variant of 'execAgainstBucket' that uses a dummy configuration. This only +-- makes sense for actions that use 'updateConfig'. +execAgainstBucket' :: + ( MonadDelay m, + MonadAsync m, + MonadFork m, + MonadMask m, + MonadTimer m, + NoThunks (m ()) + ) => + (Handlers m -> m a) -> + m a +execAgainstBucket' action = + execAgainstBucket dummyConfig action + -- | Same as 'execAgainstBucket' but returns the 'State' of the bucket when the -- action terminates. Exposed for testing purposes. evalAgainstBucket :: - (MonadDelay m, MonadAsync m, MonadFork m, MonadMask m, NoThunks (m ())) => + (MonadDelay m, MonadAsync m, MonadFork m, MonadMask m, MonadTimer m, NoThunks (m ()) + ) => Config m -> (Handlers m -> m a) -> - m (State (Config m)) + m (State m) evalAgainstBucket config action = fst <$> runAgainstBucket config action -- | Same as 'execAgainstBucket' but also returns the 'State' of the bucket when -- the action terminates. Exposed for testing purposes. runAgainstBucket :: forall m a. - (MonadDelay m, MonadAsync m, MonadFork m, MonadMask m, NoThunks (m ())) => + ( MonadDelay m, + MonadAsync m, + MonadFork m, + MonadMask m, + MonadTimer m, + NoThunks (m ()) + ) => Config m -> (Handlers m -> m a) -> - m (State (Config m), a) + m (State m, a) runAgainstBucket config action = do - bucket <- init config + runThreadVar <- atomically newEmptyTMVar -- see note [Leaky bucket design]. tid <- myThreadId - killThreadVar <- newTVarIO Nothing - finally - ( do - startThread killThreadVar bucket tid - result <- - action $ - Handlers - { fill = (snd <$>) . snapshotFill bucket, - setPaused = setPaused bucket, - updateConfig = updateConfig killThreadVar bucket tid - } - state <- snapshot bucket - pure (state, result) - ) - (stopThread killThreadVar) + bucket <- init config + withAsync (leak runThreadVar tid bucket) $ \_ -> do + atomicallyWithMonotonicTime $ maybeStartThread Nothing runThreadVar bucket + result <- + action $ + Handlers + { fill = \r t -> (snd <$>) $ snapshotFill bucket r t, + setPaused = setPaused bucket, + updateConfig = updateConfig runThreadVar bucket + } + state <- atomicallyWithMonotonicTime $ snapshot bucket + pure (state, result) where - startThread :: - StrictTVar m (Maybe (m ())) -> - Bucket m -> - ThreadId m -> - m () - startThread killThreadVar bucket tid = - readTVarIO killThreadVar >>= \case - Just _ -> error "LeakyBucket: startThread called when a thread is already running" - Nothing -> (atomically . writeTVar killThreadVar) =<< leak bucket tid - - stopThread :: StrictTVar m (Maybe (m ())) -> m () - stopThread killThreadVar = - readTVarIO killThreadVar >>= \case - Just killThread' -> killThread' - Nothing -> pure () + -- Start the thread (that is, write to its 'runThreadVar') if it is useful. + -- Takes a potential old value of the 'runThreadVar' as first argument, + -- which will be increased to help differentiate between restarts. + maybeStartThread :: Maybe Int -> StrictTMVar m Int -> Bucket m -> Time -> STM m () + maybeStartThread oldRunThread runThreadVar bucket time = do + State {config = Config {rate}} <- snapshot bucket time + when (rate > 0) $ void $ tryPutTMVar runThreadVar $ maybe 0 (+ 1) oldRunThread - setPaused :: Bucket m -> Bool -> m () - setPaused bucket paused = do - newState <- snapshot bucket - atomically $ writeTVar bucket newState {paused} + setPaused :: Bucket m -> Bool -> Time -> STM m () + setPaused bucket paused time = do + newState <- snapshot bucket time + writeTVar bucket newState {paused} updateConfig :: - StrictTVar m (Maybe (m ())) -> + StrictTMVar m Int -> Bucket m -> - ThreadId m -> - (Config m -> Config m) -> - m () - updateConfig killThreadVar bucket tid = \f -> do - State {level, time, paused, config = oldConfig} <- snapshot bucket - let newConfig@Config {capacity = newCapacity, rate = newRate} = f oldConfig - newLevel = min newCapacity level - if - | newRate <= 0 -> stopThread killThreadVar - | newRate > rate oldConfig -> stopThread killThreadVar >> startThread killThreadVar bucket tid - | otherwise -> pure () - atomically $ writeTVar bucket State {level = newLevel, time, paused, config = newConfig} + ((Rational, Config m) -> (Rational, Config m)) -> + Time -> + STM m () + updateConfig runThreadVar bucket f time = do + State + { level = oldLevel, + paused, + configGeneration = oldConfigGeneration, + config = oldConfig + } <- + snapshot bucket time + let (newLevel, newConfig) = f (oldLevel, oldConfig) + Config {capacity = newCapacity} = newConfig + newLevel' = clamp (0, newCapacity) newLevel + writeTVar bucket $ + State + { level = newLevel', + time, + paused, + configGeneration = oldConfigGeneration + 1, + config = newConfig + } + -- Ensure that 'runThreadVar' is empty, then maybe start the thread. + oldRunThread <- tryTakeTMVar runThreadVar + maybeStartThread oldRunThread runThreadVar bucket time -- | Initialise a bucket given a configuration. The bucket starts full at the -- time where one calls 'init'. @@ -191,81 +282,163 @@ init :: m (Bucket m) init config@Config {capacity} = do time <- getMonotonicTime - newTVarIO $ State {time, level = capacity, paused = False, config} + newTVarIO $ + State + { time, + level = capacity, + paused = False, + configGeneration = 0, + config = config + } + +-- Note [Leaky bucket design] +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- +-- The leaky bucket works by running the given action against a thread that +-- makes the bucket leak. Since that would be extremely inefficient to actually +-- remove tokens one by one from the token, the 'leak' thread instead looks at +-- the current state of the bucket, computes how much time it would take for the +-- bucket to empty, and then wait that amount of time. Once the wait is over, it +-- recurses, looks at the new state of the bucket, etc. If tokens were given to +-- the bucket via the action, the bucket is not empty and the loop continues. +-- +-- This description assumes that two things hold: +-- +-- - the bucket must be leaking (ie. rate is strictly positive), +-- - the action can only increase the waiting time (eg. by giving tokens). +-- +-- Neither of those properties hold in the general case. Indeed, it is possible +-- for the bucket to have a zero rate or even a negative one (for a more +-- traditional rate limiting bucket, for instance). Conversely, it is possible +-- for the action to lower the waiting time by changing the bucket configuration +-- to one where the rate is higher. +-- +-- We fix both those issues with one mechanism, the “runThreadVar”. It is an +-- MVar containing an integer that tells the thread whether it should be +-- running. An empty MVar means that the thread should not be running, for +-- instance if the rate is null. A full MVar (no matter what the integer is) +-- means that the thread should be running. When recursing, the thread blocks +-- until the MVar is full, and only then proceeds as described above. +-- Additionally, while waiting for the bucket to empty, the thread monitors +-- changes to the MVar, indicating either that the thread should stop running or +-- that the configuration changed as that it might have to wait less long. The +-- change in configuration is detected by changes in the integer. +-- +-- Note that we call “start”/“stop” running the action of filling/emtpying the +-- MVar. This is not to mistaken for the thread actually being spawned/killed. -- | Monadic action that calls 'threadDelay' until the bucket is empty, then --- returns @()@. It receives the 'ThreadId' argument of the action's thread, --- which it uses to throw exceptions at it; it returns a monadic action that can --- be used to interrupt the thread from the outside. +-- runs the 'onEmpty' action and terminates. See note [Leaky bucket design]. leak :: - (MonadDelay m, MonadCatch m, MonadFork m, MonadAsync m) => - Bucket m -> + ( MonadDelay m, + MonadCatch m, + MonadFork m, + MonadAsync m, + MonadTimer m + ) => + -- | A variable indicating whether the thread should run (when it is filled) + -- or not (otherwise). The integer it carries only helps in differentiating + -- between starts and restarts. 'leak' does not modify this variable. + StrictTMVar m Int -> + -- | The 'ThreadId' of the action's thread, which is used to throw exceptions + -- at it. ThreadId m -> - m (Maybe (m ())) -leak bucket actionThreadId = do - State {config = Config {rate}} <- snapshot bucket - if rate <= 0 - then pure Nothing - else do - a <- async go - pure $ Just $! uninterruptibleCancel a + Bucket m -> + m () +leak runThreadVar actionThreadId bucket = go where go = do - State {level, config = Config {rate, onEmpty}} <- snapshot bucket + -- Block until we are allowed to run. Do not modify the TMVar. + oldRunThread <- atomically $ readTMVar runThreadVar + -- NOTE: It is tempting to group this @atomically@ and + -- @atomicallyWithMonotonicTime@ into one; however, because the former is + -- blocking, the latter could get a _very_ inaccurate time, which we + -- cannot afford. + State {level, configGeneration = oldConfigGeneration, config = Config {rate, onEmpty}} <- + atomicallyWithMonotonicTime $ snapshot bucket let timeToWait = secondsRationalToDiffTime (level / rate) - -- NOTE: It is possible that @timeToWait == 0@ while @level > 0@ when @level@ - -- is so tiny that @level / rate@ rounds down to 0 picoseconds. In that case, - -- it is safe to assume that it is just zero. - if level <= 0 || timeToWait == 0 - then handle (\(e :: SomeException) -> throwTo actionThreadId e) onEmpty - else threadDelay timeToWait >> go + timeToWaitMicroseconds = diffTimeToMicrosecondsAsInt timeToWait + -- NOTE: It is possible that @timeToWait <= 1µs@ while @level > 0@ when + -- @level@ is extremely small. + if level <= 0 || timeToWaitMicroseconds <= 0 + then do + handle (\(e :: SomeException) -> throwTo actionThreadId e) onEmpty + -- We have run the action on empty, there is nothing left to do, + -- unless someone changes the configuration. + void $ atomically $ blockUntilChanged configGeneration oldConfigGeneration $ readTVar bucket + go + else do + -- Wait for the bucket to empty, or for the thread to be stopped or + -- restarted. Beware not to call 'registerDelay' with argument 0, that + -- is ensure that @timeToWaitMicroseconds > 0@. + varTimeout <- registerDelay timeToWaitMicroseconds + atomically $ + (check =<< TVar.readTVar varTimeout) + `orElse` + (void $ blockUntilChanged id (Just oldRunThread) $ tryReadTMVar runThreadVar) + go -- | Take a snapshot of the bucket, that is compute its state at the current -- time. snapshot :: - (MonadSTM m, MonadMonotonicTime m) => + ( MonadSTM m + ) => Bucket m -> - m (State (Config m)) -snapshot bucket = fst <$> snapshotFill bucket 0 + Time -> + STM m (State m) +snapshot bucket newTime = fst <$> snapshotFill bucket 0 newTime -- | Same as 'snapshot' but also adds the given quantity to the resulting -- level and returns whether this action overflew the bucket. -- -- REVIEW: What to do when 'toAdd' is negative? --- --- REVIEW: Really, this should all be an STM transaction. Now there is the risk --- that two snapshot-taking transactions interleave with the time measurement to --- get a slightly imprecise state (which is not the worst because everything --- should happen very fast). There is also the bigger risk that when we snapshot --- and then do something (eg. in the 'setPaused' handler) we interleave with --- something else. It cannot easily be an STM transaction, though, because we --- need to measure the time, and @io-classes@'s STM does not allow running IO in --- an STM. snapshotFill :: - (MonadSTM m, MonadMonotonicTime m) => + ( MonadSTM m + ) => Bucket m -> Rational -> - m (State (Config m), FillResult) -snapshotFill bucket toAdd = do - newTime <- getMonotonicTime - atomically $ do - State {level, time, paused, config} <- readTVar bucket - let Config {rate, capacity, fillOnOverflow} = config - elapsed = diffTime newTime time - leaked = if paused then 0 else (diffTimeToSecondsRational elapsed * rate) - levelLeaked = max 0 (level - leaked) - levelFilled = min capacity (levelLeaked + toAdd) - overflew = levelLeaked + toAdd > capacity - newLevel = if not overflew || fillOnOverflow then levelFilled else levelLeaked - newState = State {time = newTime, level = newLevel, paused, config} - writeTVar bucket newState - pure (newState, if overflew then Overflew else DidNotOverflow) + Time -> + STM m (State m, FillResult) +snapshotFill bucket toAdd newTime = do + State {level, time, paused, configGeneration, config = config} <- readTVar bucket + let Config {rate, capacity, fillOnOverflow} = config + elapsed = diffTime newTime time + leaked = if paused then 0 else (diffTimeToSecondsRational elapsed * rate) + levelLeaked = clamp (0, capacity) (level - leaked) + levelFilled = clamp (0, capacity) (levelLeaked + toAdd) + overflew = levelLeaked + toAdd > capacity + newLevel = if not overflew || fillOnOverflow then levelFilled else levelLeaked + newState = State {time = newTime, level = newLevel, paused, configGeneration, config} + writeTVar bucket newState + pure (newState, if overflew then Overflew else DidNotOverflow) -- | Convert a 'DiffTime' to a 'Rational' number of seconds. This is similar to -- 'diffTimeToSeconds' but with picoseconds precision. diffTimeToSecondsRational :: DiffTime -> Rational -diffTimeToSecondsRational = (% 1_000_000_000_000) . diffTimeToPicoseconds +diffTimeToSecondsRational = (% picosecondsPerSecond) . diffTimeToPicoseconds -- | Alias of 'realToFrac' to make code more readable and typing more explicit. secondsRationalToDiffTime :: Rational -> DiffTime secondsRationalToDiffTime = realToFrac + +-- | Helper around 'getMonotonicTime' and 'atomically'. +atomicallyWithMonotonicTime :: + ( MonadMonotonicTime m, + MonadSTM m + ) => + (Time -> STM m b) -> + m b +atomicallyWithMonotonicTime f = + atomically . f =<< getMonotonicTime + +-- NOTE: Needed for GHC 8 +clamp :: Ord a => (a, a) -> a -> a +clamp (low, high) x = min high (max low x) + +-- | Number of microseconds in a second (@10^6@). +microsecondsPerSecond :: Integer +microsecondsPerSecond = 1_000_000 + +-- | Number of picoseconds in a second (@10^12@). +picosecondsPerSecond :: Integer +picosecondsPerSecond = 1_000_000_000_000 diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs index ff05b517fb..868ca695dd 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -54,6 +54,7 @@ import Cardano.Slotting.Slot (WithOrigin (..)) import Control.Monad (forM_, unless, void, when) import Control.Monad.Class.MonadThrow (Handler (..), catches) import Control.Monad.Class.MonadTime (MonadTime, getCurrentTime) +import Control.Monad.Class.MonadTimer (MonadTimer) import Control.Monad.IOSim (runSimOrThrow) import Control.Tracer (contramap, contramapM, nullTracer) import Data.DerivingVia (InstantiatedAt (InstantiatedAt)) @@ -88,6 +89,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client TraceChainSyncClientEvent (..), bracketChainSyncClient, chainSyncClient, chainSyncStateFor, viewChainSyncState) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck +import Ouroboros.Consensus.Node.GsmState (GsmState (Syncing)) import Ouroboros.Consensus.Node.NetworkProtocolVersion (NodeToNodeVersion) import Ouroboros.Consensus.Node.ProtocolInfo @@ -309,7 +311,7 @@ data ChainSyncOutcome = ChainSyncOutcome { -- Note that updates that are scheduled before the time at which we start -- syncing help generate different chains to start syncing from. runChainSync :: - forall m. (IOLike m, MonadTime m) + forall m. (IOLike m, MonadTime m, MonadTimer m) => ClockSkew -> SecurityParam -> ClientUpdates @@ -500,6 +502,9 @@ runChainSync skew securityParam (ClientUpdates clientUpdates) chainSyncTracer chainDbView varHandles + -- 'Syncing' only ever impacts the LoP, which is disabled in + -- this test, so any value would do. + (pure Syncing) serverId maxBound lopBucketConfig diff --git a/ouroboros-consensus/test/infra-test/Test/Ouroboros/Consensus/Util/LeakyBucket/Tests.hs b/ouroboros-consensus/test/infra-test/Test/Ouroboros/Consensus/Util/LeakyBucket/Tests.hs index 8157045a45..aad4c13f0b 100644 --- a/ouroboros-consensus/test/infra-test/Test/Ouroboros/Consensus/Util/LeakyBucket/Tests.hs +++ b/ouroboros-consensus/test/infra-test/Test/Ouroboros/Consensus/Util/LeakyBucket/Tests.hs @@ -12,19 +12,23 @@ module Test.Ouroboros.Consensus.Util.LeakyBucket.Tests (tests) where import Control.Monad (foldM, void) +import Control.Monad.Class.MonadTimer (MonadTimer) import Control.Monad.IOSim (IOSim, runSimOrThrow) import Data.Either (isLeft, isRight) import Data.Functor ((<&>)) +import Data.List (intersperse) import Data.Ratio ((%)) import Data.Time.Clock (DiffTime, picosecondsToDiffTime) import Ouroboros.Consensus.Util.IOLike (Exception (displayException), MonadAsync, MonadCatch (try), MonadDelay, MonadFork, - MonadMask, MonadThrow (throwIO), NoThunks, SomeException, - Time (Time), addTime, fromException, threadDelay) + MonadMask, MonadSTM, MonadThrow (throwIO), NoThunks, + SomeException, Time (Time), addTime, fromException, + threadDelay) import Ouroboros.Consensus.Util.LeakyBucket import Test.QuickCheck (Arbitrary (arbitrary), Gen, Property, - classify, counterexample, forAll, frequency, ioProperty, - listOf1, scale, suchThat, (===)) + classify, counterexample, forAllShrinkBlind, frequency, + ioProperty, liftArbitrary2, listOf1, scale, shrinkList, + suchThat) import Test.Tasty (TestTree, testGroup) import Test.Tasty.QuickCheck (property, testProperty) import Test.Util.TestEnv (adjustQuickCheckTests) @@ -36,8 +40,8 @@ tests = testGroup "Ouroboros.Consensus.Util.LeakyBucket" [ testProperty "play too long harmless" prop_playTooLongHarmless, testProperty "play with pause" prop_playWithPause, testProperty "play with pause too long" prop_playWithPauseTooLong, - testProperty "wait almost too long" (prop_noRefill (-1)), - testProperty "wait just too long" (prop_noRefill 1), + testProperty "wait almost too long" (prop_noRefill False), + testProperty "wait just too long" (prop_noRefill True), testProperty "propagates exceptions" prop_propagateExceptions, testProperty "propagates exceptions (IO)" prop_propagateExceptionsIO, testProperty "catch exception" prop_catchException, @@ -80,6 +84,13 @@ data TestConfig = TestConfig } deriving (Eq, Show) +data TestState = TestState + { testLevel :: Rational, + testTime :: Time, + testPaused :: Bool + } + deriving (Eq, Show) + instance Arbitrary TestConfig where arbitrary = TestConfig @@ -107,38 +118,59 @@ mkConfig TestConfig {testCapacity, testRate, testThrowOnEmpty} = -- | Make a configuration that fills on overflow and throws 'EmptyBucket' on -- empty bucket. -configThrow :: MonadThrow m => Capacity -> Rate -> Config m +configThrow :: Capacity -> Rate -> TestConfig configThrow (Capacity testCapacity) (Rate testRate) = - mkConfig TestConfig{testCapacity, testRate, testThrowOnEmpty = True} + TestConfig{testCapacity, testRate, testThrowOnEmpty = True} -- | A configuration with capacity and rate 1, that fills on overflow and throws -- 'EmptyBucket' on empty bucket. -config11Throw :: MonadThrow m => Config m +config11Throw :: TestConfig config11Throw = configThrow (Capacity 1) (Rate 1) -- | Make a configuration that fills on overflow and does nothing on empty -- bucket. -configPure :: MonadThrow m => Capacity -> Rate -> Config m +configPure :: Capacity -> Rate -> TestConfig configPure (Capacity testCapacity) (Rate testRate) = - mkConfig TestConfig{testCapacity, testRate, testThrowOnEmpty = False} + TestConfig{testCapacity, testRate, testThrowOnEmpty = False} -- | A configuration with capacity 1 and rate 1, that fills on overflow and does -- nothing on empty bucket. -config11Pure :: MonadThrow m => Config m +config11Pure :: TestConfig config11Pure = configPure (Capacity 1) (Rate 1) --- | Strip the configuration from a 'State', so as to make it comparable, --- showable, etc. -stripConfig :: State cfg -> State () -stripConfig state = state{config=()} - --- | 'evalAgainstBucket' followed by 'stripConfig'. -stripEvalAgainstBucket :: - (MonadDelay m, MonadAsync m, MonadFork m, MonadMask m, NoThunks (m ())) => - Config m -> +stateToTestState :: State m -> TestState +stateToTestState State{level, time, paused} = + TestState{testLevel = level, testTime = time, testPaused = paused} + +-- | 'execAgainstBucket' except it takes a 'TestConfig'. +testExecAgainstBucket :: + ( MonadDelay m, + MonadAsync m, + MonadFork m, + MonadMask m, + MonadTimer m, + NoThunks (m ()) + ) => + TestConfig -> (Handlers m -> m a) -> - m (State ()) -stripEvalAgainstBucket config action = stripConfig <$> evalAgainstBucket config action + m a +testExecAgainstBucket testConfig action = + execAgainstBucket (mkConfig testConfig) action + +-- | 'evalAgainstBucket' except it takes a 'TestConfig' and returns a 'TestState'. +testEvalAgainstBucket :: + ( MonadDelay m, + MonadAsync m, + MonadFork m, + MonadMask m, + MonadTimer m, + NoThunks (m ()) + ) => + TestConfig -> + (Handlers m -> m a) -> + m TestState +testEvalAgainstBucket testConfig action = + stateToTestState <$> evalAgainstBucket (mkConfig testConfig) action -- | Alias for 'runSimOrThrow' by analogy to 'ioProperty'. ioSimProperty :: forall a. (forall s. IOSim s a) -> a @@ -162,10 +194,6 @@ shouldEvaluateTo a v = | otherwise -> counterexample ("Expected " ++ show v ++ "; got " ++ show result) False Left (exn :: SomeException) -> counterexample ("Expected " ++ show v ++ "; got exception " ++ displayException exn) False --- | Number of picoseconds in a second (@10^12@). -picosecondsPerSecond :: Integer -picosecondsPerSecond = 1_000_000_000_000 - -------------------------------------------------------------------------------- -- Simple properties -------------------------------------------------------------------------------- @@ -175,20 +203,20 @@ picosecondsPerSecond = 1_000_000_000_000 prop_playABit :: Property prop_playABit = ioSimProperty $ - stripEvalAgainstBucket config11Throw (\handlers -> do + testEvalAgainstBucket config11Throw (\handlers -> do threadDelay 0.5 - void $ fill handlers 67 + void $ fill' handlers 67 threadDelay 0.9 - ) `shouldEvaluateTo` State{level = 1 % 10, time = Time 1.4, paused = False, config = ()} + ) `shouldEvaluateTo` TestState{testLevel = 1 % 10, testTime = Time 1.4, testPaused = False} -- | One test case similar to 'prop_playABit' but we wait a bit too long and -- should observe the triggering of the 'onEmpty' action. prop_playTooLong :: Property prop_playTooLong = ioSimProperty $ - stripEvalAgainstBucket config11Throw (\handlers -> do + testEvalAgainstBucket config11Throw (\handlers -> do threadDelay 0.5 - void $ fill handlers 67 + void $ fill' handlers 67 threadDelay 1.1 ) `shouldThrow` EmptyBucket @@ -197,31 +225,31 @@ prop_playTooLong = prop_playTooLongHarmless :: Property prop_playTooLongHarmless = ioSimProperty $ - stripEvalAgainstBucket config11Pure (\handlers -> do + testEvalAgainstBucket config11Pure (\handlers -> do threadDelay 0.5 - void $ fill handlers 67 + void $ fill' handlers 67 threadDelay 1.1 - ) `shouldEvaluateTo` State{level = 0, time = Time 1.6, paused = False, config = ()} + ) `shouldEvaluateTo` TestState{testLevel = 0, testTime = Time 1.6, testPaused = False} prop_playWithPause :: Property prop_playWithPause = ioSimProperty $ - stripEvalAgainstBucket config11Throw (\handlers -> do + testEvalAgainstBucket config11Throw (\handlers -> do threadDelay 0.5 - setPaused handlers True + setPaused' handlers True threadDelay 1.5 - setPaused handlers False + setPaused' handlers False threadDelay 0.4 - ) `shouldEvaluateTo` State{level = 1 % 10, time = Time 2.4, paused = False, config = ()} + ) `shouldEvaluateTo` TestState{testLevel = 1 % 10, testTime = Time 2.4, testPaused = False} prop_playWithPauseTooLong :: Property prop_playWithPauseTooLong = ioSimProperty $ - stripEvalAgainstBucket config11Throw (\handlers -> do + testEvalAgainstBucket config11Throw (\handlers -> do threadDelay 0.5 - setPaused handlers True + setPaused' handlers True threadDelay 1.5 - setPaused handlers False + setPaused' handlers False threadDelay 0.6 ) `shouldThrow` EmptyBucket @@ -230,24 +258,24 @@ prop_playWithPauseTooLong = -- state. If the offset is positive, we should get an exception. NOTE: Do not -- use an offset of @0@. NOTE: Considering the precision, we *need* IOSim for -- this test. -prop_noRefill :: Integer -> Capacity -> Rate -> Property -prop_noRefill offset capacity@(Capacity c) rate@(Rate r) = do +prop_noRefill :: Bool -> Capacity -> Rate -> Property +prop_noRefill tooLong capacity@(Capacity c) rate@(Rate r) = do -- NOTE: The @-1@ is to ensure that we do not test the situation where the -- bucket empties at the *exact* same time (curtesy of IOSim) as the action. - let ps = floor (c / r * fromInteger picosecondsPerSecond) + offset + let ps = + floor (c / r * fromInteger picosecondsPerSecond) + + (if tooLong then 1 else -1) * microsecondsPerSecond time = picosecondsToDiffTime ps level = c - (ps % picosecondsPerSecond) * r - if - | offset < 0 -> + if tooLong + then ioSimProperty $ - stripEvalAgainstBucket (configThrow capacity rate) (\_ -> threadDelay time) - `shouldEvaluateTo` State{level, time = Time time, paused = False, config = ()} - | offset > 0 -> + testEvalAgainstBucket (configThrow capacity rate) (\_ -> threadDelay time) + `shouldThrow` EmptyBucket + else ioSimProperty $ - stripEvalAgainstBucket (configThrow capacity rate) (\_ -> threadDelay time) - `shouldThrow` EmptyBucket - | otherwise -> - error "prop_noRefill: do not use an offset of 0" + testEvalAgainstBucket (configThrow capacity rate) (\_ -> threadDelay time) + `shouldEvaluateTo` TestState {testLevel = level, testTime = Time time, testPaused = False} -------------------------------------------------------------------------------- -- Exception propagation @@ -263,7 +291,7 @@ instance Exception NoPlumberException prop_propagateExceptions :: Property prop_propagateExceptions = ioSimProperty $ - stripEvalAgainstBucket config11Throw (\_ -> throwIO NoPlumberException) + testEvalAgainstBucket config11Throw (\_ -> throwIO NoPlumberException) `shouldThrow` NoPlumberException @@ -271,7 +299,7 @@ prop_propagateExceptions = prop_propagateExceptionsIO :: Property prop_propagateExceptionsIO = ioProperty $ - stripEvalAgainstBucket config11Throw (\_ -> throwIO NoPlumberException) + testEvalAgainstBucket config11Throw (\_ -> throwIO NoPlumberException) `shouldThrow` NoPlumberException @@ -280,7 +308,7 @@ prop_propagateExceptionsIO = prop_catchException :: Property prop_catchException = ioSimProperty $ - execAgainstBucket config11Throw (\_ -> try $ threadDelay 1000) + testExecAgainstBucket config11Throw (\_ -> try $ threadDelay 1000) `shouldEvaluateTo` Left EmptyBucket @@ -290,58 +318,90 @@ prop_catchException = -- | Abstract “actions” to be run. We can either wait by some time or refill the -- bucket by some value. -data Action = ThreadDelay DiffTime | Fill Rational | SetPaused Bool +data Action + = Wait DiffTime + | Fill Rational + | SetPaused Bool + | -- | Set the configuration, then wait the given time. Setting the + -- configuration without waiting can lead to poorly defined situations. + SetConfigWait TestConfig DiffTime deriving (Eq, Show) -- | Random generation of 'Action's. The scales and frequencies are taken such -- that we explore as many interesting cases as possible. genAction :: Gen Action -genAction = frequency [ - (1, ThreadDelay . picosecondsToDiffTime <$> scale (* fromInteger picosecondsPerSecond) (arbitrary `suchThat` (>= 0))), - (1, Fill <$> scale (* 1_000_000_000_000_000) (arbitrary `suchThat` (>= 0))), - (1, SetPaused <$> arbitrary) - ] +genAction = + frequency + [ (1, Wait <$> genDelay), + (1, Fill <$> scale (* 1_000_000_000_000_000) (arbitrary `suchThat` (>= 0))), + (1, SetPaused <$> arbitrary), + (1, SetConfigWait <$> arbitrary <*> genDelay) + ] + where + genDelay = picosecondsToDiffTime <$> scale (* fromInteger picosecondsPerSecond) (arbitrary `suchThat` (>= 0)) -- | How to run the 'Action's in a monad. -applyActions :: MonadDelay m => Handlers m -> [Action] -> m () +applyActions :: (MonadDelay m, MonadThrow m, MonadSTM m) => Handlers m -> [Action] -> m () applyActions handlers = mapM_ $ \case - ThreadDelay t -> threadDelay t - Fill t -> void $ fill handlers t - SetPaused p -> setPaused handlers p + Wait t -> threadDelay t + Fill t -> void $ fill' handlers t + SetPaused p -> setPaused' handlers p + SetConfigWait cfg t -> do + updateConfig' handlers $ (\(l, _) -> (l, mkConfig cfg)) + threadDelay t -- | A model of what we expect the 'Action's to lead to, either an 'EmptyBucket' -- exception (if the bucket won the race) or a 'State' (otherwise). -modelActions :: TestConfig -> [Action] -> Either EmptyBucket (State TestConfig) -modelActions config = - foldM go $ State{level = testCapacity config, time = Time 0, paused = False, config} +modelActions :: TestConfig -> [Action] -> Either EmptyBucket TestState +modelActions testConfig = + (snd <$>) . foldM go (testConfig, TestState {testLevel = testCapacity testConfig, testTime = Time 0, testPaused = False}) where - go :: State TestConfig -> Action -> Either EmptyBucket (State TestConfig) - go state@State{time, level, paused, config=TestConfig{testCapacity, testRate, testThrowOnEmpty}} = \case + go :: (TestConfig, TestState) -> Action -> Either EmptyBucket (TestConfig, TestState) + go (config@TestConfig {testCapacity, testRate, testThrowOnEmpty}, state@TestState {testTime, testLevel, testPaused}) = \case Fill t -> - Right state{level = min testCapacity (level + t)} - ThreadDelay t -> - let newTime = addTime t time - newLevel = if paused then level else max 0 (level - diffTimeToSecondsRational t * testRate) + Right (config, state {testLevel = clamp (0, testCapacity) (testLevel + t)}) + Wait t -> + let newTime = addTime t testTime + newLevel = + if testPaused + then testLevel + else clamp (0, testCapacity) (testLevel - diffTimeToSecondsRational t * testRate) in if newLevel <= 0 && testThrowOnEmpty then Left EmptyBucket - else Right state{time = newTime, level = newLevel} - SetPaused p -> - Right state{paused = p} + else Right (config, state {testTime = newTime, testLevel = newLevel}) + SetPaused newPaused -> + Right (config, state {testPaused = newPaused}) + SetConfigWait newConfig@TestConfig {testCapacity = newTestCapacity} t -> + go (newConfig, state {testLevel = clamp (0, newTestCapacity) testLevel}) (Wait t) -- | A bunch of test cases where we generate a list of 'Action's ,run them via -- 'applyActions' and compare the result to that of 'modelActions'. -prop_random :: TestConfig -> Property -prop_random testConfig = - forAll (listOf1 genAction) $ \actions -> - let modelResult = modelActions testConfig actions - nbActions = length actions - in classify (isLeft modelResult) "bucket finished empty" $ - classify (isRight modelResult) "bucket finished non-empty" $ - classify (nbActions <= 10) "<= 10 actions" $ - classify (10 < nbActions && nbActions <= 20) "11-20 actions" $ - classify (20 < nbActions && nbActions <= 50) "21-50 actions" $ - classify (50 < nbActions) "> 50 actions" $ - runSimOrThrow ( - try $ stripEvalAgainstBucket (mkConfig testConfig) $ - flip applyActions actions - ) === (stripConfig <$> modelResult) +prop_random :: Property +prop_random = + forAllShrinkBlind + (liftArbitrary2 arbitrary (listOf1 genAction)) + (traverse (shrinkList (const []))) + $ \(testConfig, actions) -> + let result = + runSimOrThrow + ( try $ + testEvalAgainstBucket testConfig $ + flip applyActions actions + ) + modelResult = modelActions testConfig actions + nbActions = length actions + in classify (isLeft modelResult) "bucket finished empty" $ + classify (isRight modelResult) "bucket finished non-empty" $ + classify (nbActions <= 10) "<= 10 actions" $ + classify (10 < nbActions && nbActions <= 20) "11-20 actions" $ + classify (20 < nbActions && nbActions <= 50) "21-50 actions" $ + classify (50 < nbActions) "> 50 actions" $ + counterexample ("Config: " ++ show testConfig) $ + counterexample ("Actions:\n" ++ (concat $ intersperse "\n" $ map ((" - " ++) . show) actions)) $ + counterexample ("Result: " ++ show result) $ + counterexample ("Model: " ++ show modelResult) $ + result == modelResult + +-- NOTE: Needed for GHC 8 +clamp :: Ord a => (a, a) -> a -> a +clamp (low, high) x = min high (max low x)