Skip to content

Commit

Permalink
PLT-380 Replace TChan with bounded queue (#502)
Browse files Browse the repository at this point in the history
* Remove unreachable code.

I had moved the place where the exception is thrown and forgot to remove
the previous code.

* Replace TChan with MVar

* Add comment about MVar choice

* Incorporate more feedback
  • Loading branch information
andreabedini authored Jun 13, 2022
1 parent 6a6f8c4 commit a61ba8f
Showing 1 changed file with 30 additions and 32 deletions.
62 changes: 30 additions & 32 deletions plutus-streaming/src/Plutus/Streaming.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsg
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
Expand All @@ -41,21 +41,30 @@ withChainSyncEventStream ::
NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
-- | Stream consumer
-- | The stream consumer
(Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) ->
IO b
withChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.
-- The chain-sync client runs in a different thread passing the blocks it
-- receives to the stream consumer through a MVar. The chain-sync client
-- thread and the stream consumer will each block on each other and stay
-- in lockstep.
--
-- NOTE: choosing a MVar is a tradeoff towards simplicity. In this case a
-- (bounded) queue could perform better. Indeed a properly-sized buffer
-- can reduce the time the two threads are blocked waiting for each
-- other. The problem here is "properly-sized". A bounded queue like
-- Control.Concurrent.STM.TBQueue allows us to specify a max queue length
-- but block size can vary a lot (TODO quantify this) depending on the
-- era. We have an alternative implementation with customizable queue
-- size (TBMQueue) but it needs to be extracted from the
-- plutus-chain-index-core package. Using a simple MVar doesn't seem to
-- slow down marconi's indexing, likely because the difference is
-- negligeable compared to existing network and IO latencies. Therefore,
-- let's stick with a MVar now and revisit later.
nextBlockVar <- newEmptyMVar

-- By using newBroadcastTChan, messages can be garbage collected after
-- clients have seen them, preventing pile up. The only way to read a
-- broadcast channel is to duplicate it with dupTChan. See note at
-- `newBroadcastTChan`.
chan <- newBroadcastTChanIO
readerChannel <- atomically $ dupTChan chan

let client = chainSyncStreamingClient point chan
let client = chainSyncStreamingClient point nextBlockVar

localNodeClientProtocols =
LocalNodeClientProtocols
Expand All @@ -77,18 +86,11 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- cardano-node.
epochSlots = EpochSlots 21600

clientThread = do
connectToLocalNode connectInfo localNodeClientProtocols
-- the only reason connectToLocalNode can terminate successfully is if it
-- doesn't find an intersection, we report that case to the
-- consumer as an exception
throw NoIntersectionFound

withAsync clientThread $ \a -> do
withAsync (connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do
-- Make sure all exceptions in the client thread are passed to the consumer thread
link a
-- Run the consumer
consumer $ S.repeatM $ atomically (readTChan readerChannel)
consumer $ S.repeatM $ takeMVar nextBlockVar
-- Let's rethrow exceptions from the client thread unwrapped, so that the
-- consumer does not have to know anything about async
`catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e
Expand All @@ -97,26 +99,22 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- and runs the chain-sync mini-protocol. This client is fire-and-forget
-- and does not require any control.
--
-- Blocks obtained from the chain-sync mini-protocol are passed to a
-- consumer through a channel.
--
-- If the starting point is such that an intersection cannot be found, this
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
ChainPoint ->
TChan (ChainSyncEvent e) ->
MVar (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point chan =
chainSyncStreamingClient point nextChainEventVar =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
where
onIntersect =
ClientStIntersect
{ recvMsgIntersectFound = \_ _ ->
ChainSyncClient sendRequestNext,
recvMsgIntersectNotFound = \_ ->
ChainSyncClient $
-- There is nothing we can do here
throw NoIntersectionFound
recvMsgIntersectNotFound =
-- There is nothing we can do here
throw NoIntersectionFound
}

sendRequestNext =
Expand All @@ -126,10 +124,10 @@ chainSyncStreamingClient point chan =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollForward bim ct)
putMVar nextChainEventVar (RollForward bim ct)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollBackward cp ct)
putMVar nextChainEventVar (RollBackward cp ct)
sendRequestNext
}

0 comments on commit a61ba8f

Please sign in to comment.