Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add diffusion tests #4086

Merged
merged 17 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2346,7 +2346,7 @@ withConnectionManager ConnectionManagerArguments {
(result, mbAssertion) <- atomically $ do
mbConnVar <- Map.lookup peerAddr <$> readTMVar stateVar
case mbConnVar of
Nothing -> return (UnsupportedState UnknownConnectionSt
Nothing -> return ( UnsupportedState UnknownConnectionSt
, Nothing
)
Just MutableConnState { connVar } -> do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ includeInboundConnection
includeInboundConnection =
icmIncludeConnection . withResponderMode . getConnectionManager

-- | Unregister outbound connection. Returns if the operation was successul.
-- | Unregister outbound connection. Returns if the operation was successful.
--
-- This executes:
--
Expand Down Expand Up @@ -642,8 +642,10 @@ numberOfConnections =

-- | Useful for tracing and error messages.
--
data AbstractState
= UnknownConnectionSt
data AbstractState =
-- | Unknown connection. This state indicates the connection manager
-- removed this connection from its state.
UnknownConnectionSt
| ReservedOutboundSt
| UnnegotiatedSt !Provenance
| InboundIdleSt !DataFlow
Expand Down Expand Up @@ -839,8 +841,8 @@ data ConnectionManagerTrace peerAddr handlerTrace
| TrConnectionFailure (ConnectionId peerAddr)
| TrConnectionNotFound Provenance peerAddr
| TrForbiddenOperation peerAddr AbstractState
| TrPruneConnections (Set peerAddr) -- ^ prunning set
Int -- ^ number connections that must be prunned
| TrPruneConnections (Set peerAddr) -- ^ pruning set
Int -- ^ number connections that must be pruned
(Set peerAddr) -- ^ choice set
| TrConnectionCleanup (ConnectionId peerAddr)
| TrConnectionTimeWait (ConnectionId peerAddr)
Expand Down
67 changes: 37 additions & 30 deletions ouroboros-network-framework/src/Simulation/Network/Snocket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module Simulation.Network.Snocket
, FD
, GlobalAddressScheme (..)
, AddressType (..)
, WithAddr (..)
) where

import Prelude hiding (read)
Expand Down Expand Up @@ -134,33 +135,36 @@ mkConnection :: ( MonadLabelledSTM m
-> BearerInfo
-> ConnectionId (TestAddress addr)
-> STM m (Connection m (TestAddress addr))
mkConnection tr bearerInfo connId@ConnectionId { localAddress, remoteAddress } = do
(channelLocal, channelRemote) <-
newConnectedAttenuatedChannelPair
( ( WithAddr (Just localAddress) (Just remoteAddress)
. STAttenuatedChannelTrace connId
)
`contramap` tr)
( ( WithAddr (Just remoteAddress) (Just localAddress)
. STAttenuatedChannelTrace ConnectionId
{ localAddress = remoteAddress
, remoteAddress = localAddress
}
)
`contramap` tr)
Attenuation
{ aReadAttenuation = biOutboundAttenuation bearerInfo
, aWriteAttenuation = biOutboundWriteFailure bearerInfo
}
Attenuation
{ aReadAttenuation = biInboundAttenuation bearerInfo
, aWriteAttenuation = biInboundWriteFailure bearerInfo
}
return $ Connection channelLocal
channelRemote
(biSDUSize bearerInfo)
SYN_SENT
localAddress
mkConnection tr bearerInfo connId@ConnectionId { localAddress, remoteAddress } =
(\(connChannelLocal, connChannelRemote) ->
Connection {
connChannelLocal,
connChannelRemote,
connSDUSize = biSDUSize bearerInfo,
connState = SYN_SENT,
connProvider = localAddress
})
<$>
newConnectedAttenuatedChannelPair
( ( WithAddr (Just localAddress) (Just remoteAddress)
. STAttenuatedChannelTrace connId
)
`contramap` tr)
( ( WithAddr (Just remoteAddress) (Just localAddress)
. STAttenuatedChannelTrace ConnectionId
{ localAddress = remoteAddress
, remoteAddress = localAddress
}
)
`contramap` tr)
Attenuation
{ aReadAttenuation = biOutboundAttenuation bearerInfo
, aWriteAttenuation = biOutboundWriteFailure bearerInfo
}
Attenuation
{ aReadAttenuation = biInboundAttenuation bearerInfo
, aWriteAttenuation = biInboundWriteFailure bearerInfo
}


-- | Connection id independent of who provisioned the connection. 'NormalisedId'
Expand Down Expand Up @@ -528,6 +532,7 @@ newtype FD m peerAddr = FD { fdVar :: (StrictTVar m (FD_ m peerAddr)) }
-- Simulated snockets
--

-- TODO: use `Ouroboros.Network.ExitPolicy.WithAddr`
data WithAddr addr event =
WithAddr { waLocalAddr :: Maybe addr
, waRemoteAddr :: Maybe addr
Expand Down Expand Up @@ -733,16 +738,18 @@ mkSnocket state tr = Snocket { getLocalAddr
normalisedId = normaliseId connId

bearerInfo <- case Map.lookup normalisedId (nsAttenuationMap state) of
Nothing -> do
return (nsDefaultBearerInfo state)

Nothing -> return (nsDefaultBearerInfo state)
Just script -> stepScriptSTM script

connMap <- readTVar (nsConnections state)
case Map.lookup normalisedId connMap of
Just Connection { connState = ESTABLISHED } ->
throwSTM (connectedIOError fd_)

Just Connection { connState = SYN_SENT, connProvider }
| connProvider == localAddress ->
throwSTM (connectedIOError fd_)

Comment on lines +749 to +752
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if the error is the right one, I need to check what error real sockets would throw.

-- simultaneous open
Just conn@Connection { connState = SYN_SENT } -> do
let conn' = conn { connState = ESTABLISHED }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Ouroboros.Network.Testing.Data.Signal
, fromChangeEvents
, toChangeEvents
, fromEvents
, fromEventsWith
-- ** QuickCheck
, signalProperty
-- * Simple signal transformations
Expand Down Expand Up @@ -191,6 +192,17 @@ fromEvents (Events txs) =
]


-- | Like 'fromEvents' but it is using the given value 'a' instead of 'Nothing.
-- It is equivalent to `\a -> fmap (fromMaybe a) . fromEvents`
--
fromEventsWith :: a -> Events a -> Signal a
fromEventsWith a (Events txs) =
Signal a
[ E (TS t i') s
| E (TS t i) x <- txs
, (i', s) <- [(i, x), (i+1, a)]
]

-- | A signal can change value more than once at a single point of time.
--
-- Sometimes we are interested only in the final \"stable\" value of the signal
Expand Down Expand Up @@ -360,7 +372,10 @@ keyedTimeout d arm =
-> Set b
-> [E a]
-> [E (Set b)]
go _ _ _ [] = []
go _ armedPSQ _ [] =
(\(b, t, _) -> E (TS t 0) (Set.singleton b))
`map`
PSQ.toList armedPSQ

go armedSet armedPSQ timedout (E ts@(TS t _) x : txs)
| Just (y, t', _, armedPSQ') <- PSQ.minView armedPSQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ module Ouroboros.Network.PeerSelection.PeerStateActions
, MonitorPeerConnectionBlocked (..)
-- * Trace
, PeerSelectionActionsTrace (..)
, PeerStatusChangeType (..)
, FailureType (..)
) where

import Control.Concurrent.Class.MonadSTM.Strict
Expand Down
15 changes: 10 additions & 5 deletions ouroboros-network/test/Test/Ouroboros/Network/Diffusion/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ import Control.Monad ((>=>))
import Control.Monad.Class.MonadAsync
(MonadAsync (Async, wait, withAsync))
import Control.Monad.Class.MonadFork (MonadFork)
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadThrow (MonadEvaluate, MonadMask,
MonadThrow, SomeException)
import Control.Monad.Class.MonadTime (DiffTime, MonadTime)
import Control.Monad.Class.MonadTimer (MonadTimer)
import Control.Monad.Fix (MonadFix)
import Control.Tracer (nullTracer)
import Control.Tracer (Tracer (..), nullTracer)

import Data.Foldable (foldl')
import Data.IP (IP (..))
Expand Down Expand Up @@ -136,6 +137,7 @@ data Arguments m = Arguments
, aTimeWaitTimeout :: DiffTime
, aDNSTimeoutScript :: Script DNSTimeout
, aDNSLookupDelayScript :: Script DNSLookupDelay
, aDebugTracer :: Tracer m String
}

-- The 'mockDNSActions' is not using \/ specifying 'resolverException', thus we
Expand All @@ -151,6 +153,7 @@ run :: forall resolver m.
, MonadLabelledSTM m
, MonadTraceSTM m
, MonadMask m
, MonadSay m
, MonadST m
, MonadTime m
, MonadTimer m
Expand All @@ -161,15 +164,16 @@ run :: forall resolver m.
, forall a. Semigroup a => Semigroup (m a)
, Eq (Async m Void)
)
=> Node.BlockGeneratorArgs Block StdGen
=> Tracer m String
-> Node.BlockGeneratorArgs Block StdGen
-> Node.LimitsAndTimeouts Block
-> Interfaces m
-> Arguments m
-> Diff.P2P.TracersExtra NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
ResolverException m
-> m Void
run blockGeneratorArgs limits ni na tracersExtra =
run _debugTracer blockGeneratorArgs limits ni na tracersExtra =
Node.withNodeKernelThread blockGeneratorArgs
$ \ nodeKernel nodeKernelThread -> do
dnsTimeoutScriptVar <- LazySTM.newTVarIO (aDNSTimeoutScript na)
Expand Down Expand Up @@ -236,13 +240,14 @@ run blockGeneratorArgs limits ni na tracersExtra =
, Diff.P2P.daReturnPolicy = \_ -> 0
}

apps <- Node.applications @_ @BlockHeader nodeKernel Node.cborCodecs limits appArgs
apps <- Node.applications @_ @BlockHeader (aDebugTracer na) nodeKernel Node.cborCodecs limits appArgs

registry <- newFetchClientRegistry

withAsync
(Diff.P2P.runM interfaces
Diff.nullTracers tracersExtra
Diff.nullTracers
tracersExtra
args argsExtra apps appsExtra)
$ \ diffusionThread ->
withAsync (blockFetch registry nodeKernel) $ \blockFetchLogicThread ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

-- orphaned 'ShowProxy PingPong' instance.
{-# OPTIONS_GHC -Wno-orphans #-}
Expand All @@ -19,11 +20,12 @@ import qualified Control.Concurrent.Class.MonadSTM as LazySTM
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer (nullTracer)
import Control.Tracer (Tracer (..), contramap, nullTracer)
import Data.ByteString.Lazy (ByteString)
import Data.Functor (($>))
import Data.Maybe (fromMaybe)
Expand Down Expand Up @@ -177,23 +179,26 @@ applications :: forall block header m.
( MonadAsync m
, MonadFork m
, MonadMask m
, MonadSay m
, MonadThrow m
, MonadTime m
, MonadTimer m
, MonadThrow (STM m)
, HasHeader header
, HasHeader block
, HeaderHash header ~ HeaderHash block
, Show block
, ShowProxy block
)
=> NodeKernel header block m
=> Tracer m String
-> NodeKernel header block m
-> Codecs block m
-> LimitsAndTimeouts block
-> AppArgs m
-> m (Diff.Applications NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
m ())
applications nodeKernel
applications debugTracer nodeKernel
Codecs { chainSyncCodec, blockFetchCodec
, keepAliveCodec, pingPongCodec }
limits
Expand Down Expand Up @@ -261,14 +266,14 @@ applications nodeKernel
blockFetchResponder
}
]
, withWarm = WithWarm $ \ _connId controlMessageSTM ->
, withWarm = WithWarm $ \ connId controlMessageSTM ->
[ MiniProtocol
{ miniProtocolNum = MiniProtocolNum 9
, miniProtocolLimits = pingPongLimits limits
, miniProtocolRun =
InitiatorAndResponderProtocol
(pingPongInitiator controlMessageSTM)
pingPongResponder
(pingPongInitiator connId controlMessageSTM)
(pingPongResponder connId)
}
]
, withEstablished = WithEstablished $ \ connId controlMessageSTM ->
Expand Down Expand Up @@ -376,13 +381,13 @@ applications nodeKernel
:: ConnectionId NtNAddr
-> ControlMessageSTM m
-> MuxPeer ByteString m ()
keepAliveInitiator ConnectionId { remoteAddress }
keepAliveInitiator connId@ConnectionId { remoteAddress }
controlMessageSTM =
MuxPeerRaw $ \channel -> do
labelThisThread "KeepAliveClient"
let kacApp =
\ctxVar -> runPeerWithLimits
nullTracer
((show . (connId,)) `contramap` debugTracer)
keepAliveCodec
(keepAliveSizeLimits limits)
(keepAliveTimeLimits limits)
Expand Down Expand Up @@ -412,11 +417,12 @@ applications nodeKernel
(keepAliveServerPeer keepAliveServer)

pingPongInitiator
:: ControlMessageSTM m
:: ConnectionId NtNAddr
-> ControlMessageSTM m
-> MuxPeer ByteString m ()
pingPongInitiator controlMessageSTM = MuxPeerRaw $ \channel ->
pingPongInitiator connId controlMessageSTM = MuxPeerRaw $ \channel ->
runPeerWithLimits
nullTracer
((show . (connId,)) `contramap` debugTracer)
pingPongCodec
(pingPongSizeLimits limits)
(pingPongTimeLimits limits)
Expand Down Expand Up @@ -451,10 +457,11 @@ applications nodeKernel
else return $ PingPong.SendMsgDone ()

pingPongResponder
:: MuxPeer ByteString m ()
pingPongResponder = MuxPeerRaw $ \channel ->
:: ConnectionId NtNAddr
-> MuxPeer ByteString m ()
pingPongResponder connId = MuxPeerRaw $ \channel ->
runPeerWithLimits
nullTracer
((show . (connId,)) `contramap` debugTracer)
pingPongCodec
(pingPongSizeLimits limits)
(pingPongTimeLimits limits)
Expand Down
Loading