Skip to content

Commit

Permalink
Simple cardano-client subscriber & mux changes (#4979)
Browse files Browse the repository at this point in the history
* node-to-client: expose connectToWithMux

Expose `connectTo` version which accepts a callback that has access to
`Mux`.  This allows to implement ones own logic for:
* starting approriate mini-protocols (one doesn't need to start all of
  them).
* implement ones own restarting policy for `mini-protocols`, without
  terminating a connection.

* ouroboros-network-framework: added ConnectToArgs record

* added `ConnectToArgs`
* `connectToNode` and friends return the result of the first terminated
  mini-protocol (or its error)

* cardano-client: subscription without subscribtion worker

* network-mux: generalised Channel type

Provide a `Channel` which can send arbitrary data.  This is useful if
one wants to test application with or without the multiplexer.  For us
this simplifies `Ouroboros.Network.Channel` module in the
`ouroboros-network-framework` package.

* network-mux: added name field to MuxBearer

It is used to name various shared stm variables.
NOTE: we name them when we `runMux`, not when `newMux` is created.

* Updated CHANGELOG.md files
  • Loading branch information
coot authored Oct 13, 2024
1 parent 53ceed0 commit e606615
Show file tree
Hide file tree
Showing 32 changed files with 772 additions and 591 deletions.
10 changes: 10 additions & 0 deletions cardano-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## next version

### Breaking changes

* Reimplemntation of `subscribe` without relaying on non-p2p stack. Its
arguments have changed. Note that the `NodeToClientProtocols` and
`OuroborosApplicationWithMinimalCtx` specify `Void` as return type of the
responder side.
* The default reconnect delay was increased from `0.025s` to `5s`.

### Non-breaking changes

## 0.3.1.5 -- 2024-08-27

### Breaking changes
Expand Down
3 changes: 3 additions & 0 deletions cardano-client/cardano-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ library
build-depends:
base >=4.14 && <4.21,
bytestring >=0.10 && <0.13,
cborg,
containers,
contra-tracer,
network-mux ^>=0.4.5,
ouroboros-network >=0.9 && <0.18,
ouroboros-network-api >=0.5.2 && <0.10,
ouroboros-network-framework >=0.8 && <0.14,
si-timers,

ghc-options:
-Wall
Expand Down
149 changes: 116 additions & 33 deletions cardano-client/src/Cardano/Client/Subscription.hs
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

module Cardano.Client.Subscription
( subscribe
( -- * Subscription API
subscribe
, SubscriptionParams (..)
, SubscriptionTracers (..)
, SubscriptionTrace (..)
-- * Re-exports
-- ** Mux
, MuxMode (..)
, ConnectionId
, LocalAddress
, MuxTrace
, WithMuxBearer
-- ** Connections
, ConnectionId (..)
, LocalAddress (..)
-- ** Protocol API
, NodeToClientProtocols (..)
, MiniProtocolCb (..)
, MuxTrace
, RunMiniProtocol (..)
, WithMuxBearer
, ControlMessage (..)
) where

import Codec.CBOR.Term qualified as CBOR
import Control.Exception
import Control.Monad (join)
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)
import Data.ByteString.Lazy qualified as BSL
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Void (Void)

import Network.Mux.Trace (MuxTrace, WithMuxBearer)
Expand All @@ -27,15 +43,47 @@ import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Magic (NetworkMagic)
import Ouroboros.Network.Mux (MiniProtocolCb (..), MuxMode (..),
OuroborosApplicationWithMinimalCtx, RunMiniProtocol (..))
import Ouroboros.Network.NodeToClient (ClientSubscriptionParams (..),
ConnectionId, LocalAddress, NetworkClientSubcriptionTracers,
NodeToClientProtocols (..), NodeToClientVersion,
NodeToClientVersionData (NodeToClientVersionData),
ncSubscriptionWorker, newNetworkMutableState,
versionedNodeToClientProtocols)
import Ouroboros.Network.Protocol.Handshake.Version (Versions, foldMapVersions)

import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.NodeToClient (Handshake, LocalAddress (..),
NetworkConnectTracers (..), NodeToClientProtocols,
NodeToClientVersion, NodeToClientVersionData (..), TraceSendRecv,
Versions)
import Ouroboros.Network.NodeToClient qualified as NtC
import Ouroboros.Network.Snocket qualified as Snocket

data SubscriptionParams a = SubscriptionParams
{ spAddress :: !LocalAddress
-- ^ unix socket or named pipe address
, spReconnectionDelay :: !(Maybe DiffTime)
-- ^ delay between connection attempts. The default value is `5s`.
, spCompleteCb :: Either SomeException a -> Decision
}

data Decision =
Abort
-- ^ abort subscription loop
| Reconnect
-- ^ reconnect

data SubscriptionTracers a = SubscriptionTracers {
stMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace),
-- ^ low level mux-network tracer, which logs mux sdu (send and received)
-- and other low level multiplexing events.
stHandshakeTracer :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress)
(TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))),
-- ^ handshake protocol tracer; it is important for analysing version
-- negotation mismatches.
stSubscriptionTracer :: Tracer IO (SubscriptionTrace a)
}

data SubscriptionTrace a =
SubscriptionResult a
| SubscriptionError SomeException
| SubscriptionReconnect
| SubscriptionTerminate
deriving Show

-- | Subscribe using `node-to-client` mini-protocol.
--
-- 'blockVersion' ought to be instantiated with `BlockNodeToClientVersion blk`.
Expand All @@ -44,34 +92,66 @@ import Ouroboros.Network.Snocket qualified as Snocket
-- `Ouroboros.Consensus.Network.NodeToClient.clientCodecs`.
--
subscribe
:: forall blockVersion x y.
:: forall blockVersion a.
Snocket.LocalSnocket
-> NetworkMagic
-> Map NodeToClientVersion blockVersion
-- ^ Use `supportedNodeToClientVersions` from `ouroboros-consensus`.
-> NetworkClientSubcriptionTracers
-> ClientSubscriptionParams ()
-> SubscriptionTracers a
-> SubscriptionParams a
-> ( NodeToClientVersion
-> blockVersion
-> NodeToClientProtocols 'InitiatorMode LocalAddress BSL.ByteString IO x y)
-> IO Void
subscribe snocket networkMagic supportedVersions tracers subscriptionParams protocols = do
networkState <- newNetworkMutableState
ncSubscriptionWorker
snocket
tracers
networkState
subscriptionParams
(versionedProtocols networkMagic supportedVersions protocols)
-> NodeToClientProtocols 'InitiatorMode LocalAddress BSL.ByteString IO a Void)
-> IO ()
subscribe snocket networkMagic supportedVersions
SubscriptionTracers {
stMuxTracer = muxTracer,
stHandshakeTracer = handshakeTracer,
stSubscriptionTracer = tracer
}
SubscriptionParams {
spAddress = addr,
spReconnectionDelay = reConnDelay,
spCompleteCb = completeCb
}
protocols =
mask $ \unmask ->
loop unmask $
NtC.connectTo
snocket
NetworkConnectTracers {
nctMuxTracer = muxTracer,
nctHandshakeTracer = handshakeTracer
}
(versionedProtocols networkMagic supportedVersions protocols)
(getFilePath addr)
where
loop :: (forall x. IO x -> IO x) -> IO (Either SomeException a) -> IO ()
loop unmask act = do
r <- squashLefts <$> try (unmask act)
case r of
Right a -> traceWith tracer (SubscriptionResult a)
Left e -> traceWith tracer (SubscriptionError e)
case completeCb r of
Abort ->
traceWith tracer SubscriptionTerminate
Reconnect -> do
traceWith tracer SubscriptionReconnect
threadDelay (fromMaybe 5 reConnDelay)
loop unmask act

squashLefts :: forall x y. Either x (Either x y) -> Either x y
squashLefts = join


versionedProtocols ::
forall m appType bytes blockVersion a b.
forall m appType bytes blockVersion a.
NetworkMagic
-> Map NodeToClientVersion blockVersion
-- ^ Use `supportedNodeToClientVersions` from `ouroboros-consensus`.
-> ( NodeToClientVersion
-> blockVersion
-> NodeToClientProtocols appType LocalAddress bytes m a b)
-> NodeToClientProtocols appType LocalAddress bytes m a Void)
-- ^ callback which receives codecs, connection id and STM action which
-- can be checked if the networking runtime system requests the protocols
-- to stop.
Expand All @@ -82,18 +162,21 @@ versionedProtocols ::
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a b)
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a Void)
versionedProtocols networkMagic supportedVersions callback =
foldMapVersions applyVersion $ Map.toList supportedVersions
NtC.foldMapVersions applyVersion (Map.toList supportedVersions)
where
applyVersion
:: (NodeToClientVersion, blockVersion)
-> Versions
NodeToClientVersion
NodeToClientVersionData
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a b)
(OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a Void)
applyVersion (version, blockVersion) =
versionedNodeToClientProtocols
NtC.versionedNodeToClientProtocols
version
(NodeToClientVersionData networkMagic False)
NodeToClientVersionData {
networkMagic,
query = False
}
(callback version blockVersion)
4 changes: 4 additions & 0 deletions network-mux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
* `Ouroboros.Network.Mux.mkMiniProtocolBundle` was renamed to
`mkMiniProtocolInfos`, its type changed.
* Removed `MiniProtocolBundle` newtype wrapper.
* Generalised `Channel` type and provide `ByteChannel` type alias.
* Provide additional APIs in the `Network.Mux.Channel` for creating channels
and byte channels.
* `MuxBearer` has a `name` field.

### Non-breaking changes

Expand Down
22 changes: 14 additions & 8 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ module Network.Mux
, runMux
, runMiniProtocol
, StartOnDemandOrEagerly (..)
, ByteChannel
, Channel (..)
, stopMux
-- * Bearer
, MuxBearer
Expand Down Expand Up @@ -114,7 +116,7 @@ data MuxStatus
-- | Create a mux handle.
--
newMux :: forall (mode :: MuxMode) m.
MonadSTM m
MonadLabelledSTM m
=> [MiniProtocolInfo mode]
-- ^ description of protocols run by the mux layer. Only these protocols
-- one will be able to execute.
Expand Down Expand Up @@ -215,9 +217,13 @@ runMux :: forall m mode.
-> Mux mode m
-> MuxBearer m
-> m ()
runMux tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer = do
runMux tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@MuxBearer {name} = do
egressQueue <- atomically $ newTBQueue 100
labelTBQueueIO egressQueue "mux-eq"

-- label shared variables
labelTBQueueIO egressQueue (name ++ "-mux-egress")
labelTVarIO muxStatus (name ++ "-mux-status")
labelTQueueIO muxControlCmdQueue (name ++ "-mux-ctrl")

JobPool.withJobPool
(\jobpool -> do
Expand Down Expand Up @@ -246,13 +252,13 @@ runMux tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer = do
JobPool.Job (muxer egressQueue bearer)
(return . MuxerException)
MuxJob
"muxer"
(name ++ "-muxer")

demuxerJob =
JobPool.Job (demuxer (Map.elems muxMiniProtocols) bearer)
(return . DemuxerException)
MuxJob
"demuxer"
(name ++ "-demuxer")

miniProtocolJob
:: forall mode m.
Expand Down Expand Up @@ -329,7 +335,7 @@ data StartOnDemandOrEagerly = StartOnDemand | StartEagerly
deriving Eq

data MiniProtocolAction m where
MiniProtocolAction :: (Channel m -> m (a, Maybe BL.ByteString)) -- ^ Action
MiniProtocolAction :: (ByteChannel m -> m (a, Maybe BL.ByteString)) -- ^ Action
-> StrictTMVar m (Either SomeException a) -- ^ Completion var
-> MiniProtocolAction m

Expand Down Expand Up @@ -551,7 +557,7 @@ muxChannel
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
-> ByteChannel m
muxChannel tracer egressQueue want@(Wanton w) mc md q =
Channel { send, recv}
where
Expand Down Expand Up @@ -638,7 +644,7 @@ runMiniProtocol :: forall mode m a.
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (Channel m -> m (a, Maybe BL.ByteString))
-> (ByteChannel m -> m (a, Maybe BL.ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol Mux { muxMiniProtocols, muxControlCmdQueue , muxStatus}
ptclNum ptclDir startMode protocolAction
Expand Down
3 changes: 2 additions & 1 deletion network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ attenuationChannelAsMuxBearer sduSize sduTimeout muxTracer chan =
MuxBearer {
read = readMux,
write = writeMux,
sduSize
sduSize,
name = "attenuation-channel"
}
where
readMux :: TimeoutFn m -> m (MuxSDU, Time)
Expand Down
3 changes: 2 additions & 1 deletion network-mux/src/Network/Mux/Bearer/NamedPipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ namedPipeAsBearer sduSize tracer h =
Mx.MuxBearer {
Mx.read = readNamedPipe,
Mx.write = writeNamedPipe,
Mx.sduSize = sduSize
Mx.sduSize = sduSize,
Mx.name = "named-pipe"
}
where
readNamedPipe :: Mx.TimeoutFn IO -> IO (Mx.MuxSDU, Time)
Expand Down
3 changes: 2 additions & 1 deletion network-mux/src/Network/Mux/Bearer/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ pipeAsMuxBearer sduSize tracer channel =
Mx.MuxBearer {
Mx.read = readPipe,
Mx.write = writePipe,
Mx.sduSize = sduSize
Mx.sduSize = sduSize,
Mx.name = "pipe"
}
where
readPipe :: Mx.TimeoutFn IO -> IO (Mx.MuxSDU, Time)
Expand Down
3 changes: 2 additions & 1 deletion network-mux/src/Network/Mux/Bearer/Queues.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ queueChannelAsMuxBearer sduSize tracer QueueChannel { writeQueue, readQueue } =
Mx.MuxBearer {
Mx.read = readMux,
Mx.write = writeMux,
Mx.sduSize = sduSize
Mx.sduSize = sduSize,
Mx.name = "queue-channel"
}
where
readMux :: Mx.TimeoutFn m -> m (Mx.MuxSDU, Time)
Expand Down
3 changes: 2 additions & 1 deletion network-mux/src/Network/Mux/Bearer/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ socketAsMuxBearer sduSize sduTimeout tracer sd =
Mx.MuxBearer {
Mx.read = readSocket,
Mx.write = writeSocket,
Mx.sduSize = sduSize
Mx.sduSize = sduSize,
Mx.name = "socket-bearer"
}
where
hdrLenght = 8
Expand Down
Loading

0 comments on commit e606615

Please sign in to comment.