diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index a1a818230c..33716cbab3 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -15,7 +15,7 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsg ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound), ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward)) import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync) -import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) +import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar) import Control.Exception (Exception, SomeException (SomeException), catch, throw) import GHC.Generics (Generic) import Streaming (Of, Stream) @@ -41,21 +41,30 @@ withChainSyncEventStream :: NetworkId -> -- | The point on the chain to start streaming from ChainPoint -> - -- | Stream consumer + -- | The stream consumer (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) -> IO b withChainSyncEventStream socketPath networkId point consumer = do - -- The chain-sync client runs in a different thread and it will send us - -- block through this channel. + -- The chain-sync client runs in a different thread passing the blocks it + -- receives to the stream consumer through a MVar. The chain-sync client + -- thread and the stream consumer will each block on each other and stay + -- in lockstep. + -- + -- NOTE: choosing a MVar is a tradeoff towards simplicity. In this case a + -- (bounded) queue could perform better. Indeed a properly-sized buffer + -- can reduce the time the two threads are blocked waiting for each + -- other. The problem here is "properly-sized". A bounded queue like + -- Control.Concurrent.STM.TBQueue allows us to specify a max queue length + -- but block size can vary a lot (TODO quantify this) depending on the + -- era. We have an alternative implementation with customizable queue + -- size (TBMQueue) but it needs to be extracted from the + -- plutus-chain-index-core package. Using a simple MVar doesn't seem to + -- slow down marconi's indexing, likely because the difference is + -- negligeable compared to existing network and IO latencies. Therefore, + -- let's stick with a MVar now and revisit later. + nextBlockVar <- newEmptyMVar - -- By using newBroadcastTChan, messages can be garbage collected after - -- clients have seen them, preventing pile up. The only way to read a - -- broadcast channel is to duplicate it with dupTChan. See note at - -- `newBroadcastTChan`. - chan <- newBroadcastTChanIO - readerChannel <- atomically $ dupTChan chan - - let client = chainSyncStreamingClient point chan + let client = chainSyncStreamingClient point nextBlockVar localNodeClientProtocols = LocalNodeClientProtocols @@ -77,18 +86,11 @@ withChainSyncEventStream socketPath networkId point consumer = do -- cardano-node. epochSlots = EpochSlots 21600 - clientThread = do - connectToLocalNode connectInfo localNodeClientProtocols - -- the only reason connectToLocalNode can terminate successfully is if it - -- doesn't find an intersection, we report that case to the - -- consumer as an exception - throw NoIntersectionFound - - withAsync clientThread $ \a -> do + withAsync (connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do -- Make sure all exceptions in the client thread are passed to the consumer thread link a -- Run the consumer - consumer $ S.repeatM $ atomically (readTChan readerChannel) + consumer $ S.repeatM $ takeMVar nextBlockVar -- Let's rethrow exceptions from the client thread unwrapped, so that the -- consumer does not have to know anything about async `catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e @@ -97,26 +99,22 @@ withChainSyncEventStream socketPath networkId point consumer = do -- and runs the chain-sync mini-protocol. This client is fire-and-forget -- and does not require any control. -- --- Blocks obtained from the chain-sync mini-protocol are passed to a --- consumer through a channel. --- -- If the starting point is such that an intersection cannot be found, this -- client will throw a NoIntersectionFound exception. chainSyncStreamingClient :: ChainPoint -> - TChan (ChainSyncEvent e) -> + MVar (ChainSyncEvent e) -> ChainSyncClient e ChainPoint ChainTip IO () -chainSyncStreamingClient point chan = +chainSyncStreamingClient point nextChainEventVar = ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect where onIntersect = ClientStIntersect { recvMsgIntersectFound = \_ _ -> ChainSyncClient sendRequestNext, - recvMsgIntersectNotFound = \_ -> - ChainSyncClient $ - -- There is nothing we can do here - throw NoIntersectionFound + recvMsgIntersectNotFound = + -- There is nothing we can do here + throw NoIntersectionFound } sendRequestNext = @@ -126,10 +124,10 @@ chainSyncStreamingClient point chan = ClientStNext { recvMsgRollForward = \bim ct -> ChainSyncClient $ do - atomically $ writeTChan chan (RollForward bim ct) + putMVar nextChainEventVar (RollForward bim ct) sendRequestNext, recvMsgRollBackward = \cp ct -> ChainSyncClient $ do - atomically $ writeTChan chan (RollBackward cp ct) + putMVar nextChainEventVar (RollBackward cp ct) sendRequestNext }