Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLT-380 Replace TChan with bounded queue #502

Merged
merged 4 commits into from
Jun 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 30 additions & 32 deletions plutus-streaming/src/Plutus/Streaming.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsg
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
Expand All @@ -41,21 +41,30 @@ withChainSyncEventStream ::
NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
-- | Stream consumer
-- | The stream consumer
(Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) ->
IO b
withChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.
-- The chain-sync client runs in a different thread passing the blocks it
-- receives to the stream consumer through a MVar. The chain-sync client
-- thread and the stream consumer will each block on each other and stay
-- in lockstep.
--
-- NOTE: choosing a MVar is a tradeoff towards simplicity. In this case a
-- (bounded) queue could perform better. Indeed a properly-sized buffer
-- can reduce the time the two threads are blocked waiting for each
-- other. The problem here is "properly-sized". A bounded queue like
-- Control.Concurrent.STM.TBQueue allows us to specify a max queue length
-- but block size can vary a lot (TODO quantify this) depending on the
-- era. We have an alternative implementation with customizable queue
-- size (TBMQueue) but it needs to be extracted from the
-- plutus-chain-index-core package. Using a simple MVar doesn't seem to
-- slow down marconi's indexing, likely because the difference is
-- negligeable compared to existing network and IO latencies. Therefore,
-- let's stick with a MVar now and revisit later.
nextBlockVar <- newEmptyMVar

-- By using newBroadcastTChan, messages can be garbage collected after
-- clients have seen them, preventing pile up. The only way to read a
-- broadcast channel is to duplicate it with dupTChan. See note at
-- `newBroadcastTChan`.
chan <- newBroadcastTChanIO
readerChannel <- atomically $ dupTChan chan

let client = chainSyncStreamingClient point chan
let client = chainSyncStreamingClient point nextBlockVar

localNodeClientProtocols =
LocalNodeClientProtocols
Expand All @@ -74,18 +83,11 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- FIXME this comes from the config file but Cardano.Api does not expose readNetworkConfig!
epochSlots = EpochSlots 40

clientThread = do
connectToLocalNode connectInfo localNodeClientProtocols
-- the only reason connectToLocalNode can terminate successfully is if it
-- doesn't find an intersection, we report that case to the
-- consumer as an exception
throw NoIntersectionFound

withAsync clientThread $ \a -> do
withAsync (connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do
-- Make sure all exceptions in the client thread are passed to the consumer thread
link a
-- Run the consumer
consumer $ S.repeatM $ atomically (readTChan readerChannel)
consumer $ S.repeatM $ takeMVar nextBlockVar
-- Let's rethrow exceptions from the client thread unwrapped, so that the
-- consumer does not have to know anything about async
`catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e
Expand All @@ -94,26 +96,22 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- and runs the chain-sync mini-protocol. This client is fire-and-forget
-- and does not require any control.
--
-- Blocks obtained from the chain-sync mini-protocol are passed to a
-- consumer through a channel.
--
-- If the starting point is such that an intersection cannot be found, this
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
ChainPoint ->
TChan (ChainSyncEvent e) ->
MVar (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point chan =
chainSyncStreamingClient point nextChainEventVar =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
where
onIntersect =
ClientStIntersect
{ recvMsgIntersectFound = \_ _ ->
ChainSyncClient sendRequestNext,
recvMsgIntersectNotFound = \_ ->
ChainSyncClient $
-- There is nothing we can do here
throw NoIntersectionFound
recvMsgIntersectNotFound =
-- There is nothing we can do here
throw NoIntersectionFound
}

sendRequestNext =
Expand All @@ -123,10 +121,10 @@ chainSyncStreamingClient point chan =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollForward bim ct)
putMVar nextChainEventVar (RollForward bim ct)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollBackward cp ct)
putMVar nextChainEventVar (RollBackward cp ct)
sendRequestNext
}