From d13291713daecca6cdc1813179e9e2de4b23367a Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Thu, 9 Jun 2022 14:01:33 +0800 Subject: [PATCH] Use a bounded channel inside withChainSyncEventStream Using a unbounded channel can cause memory issues with the consumer is slower than the producer. Since the producer is a locally connected node, this is most likely the case. --- plutus-streaming/src/Plutus/Streaming.hs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index fb07a454d7..e260f1d214 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -15,7 +15,8 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsg ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound), ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward)) import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync) -import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) +import Control.Concurrent.STM (atomically) +import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue) import Control.Exception (Exception, SomeException (SomeException), catch, throw) import GHC.Generics (Generic) import Streaming (Of, Stream) @@ -48,14 +49,11 @@ withChainSyncEventStream socketPath networkId point consumer = do -- The chain-sync client runs in a different thread and it will send us -- block through this channel. - -- By using newBroadcastTChan, messages can be garbage collected after - -- clients have seen them, preventing pile up. The only way to read a - -- broadcast channel is to duplicate it with dupTChan. See note at - -- `newBroadcastTChan`. - chan <- newBroadcastTChanIO - readerChannel <- atomically $ dupTChan chan + -- On local testing we do about 1000 events per second next to a local + -- node. One second of buffer sounds reasonable to me. + queue <- newTBQueueIO 1000 - let client = chainSyncStreamingClient point chan + let client = chainSyncStreamingClient point queue localNodeClientProtocols = LocalNodeClientProtocols @@ -85,7 +83,7 @@ withChainSyncEventStream socketPath networkId point consumer = do -- Make sure all exceptions in the client thread are passed to the consumer thread link a -- Run the consumer - consumer $ S.repeatM $ atomically (readTChan readerChannel) + consumer $ S.repeatM $ atomically (readTBQueue queue) -- Let's rethrow exceptions from the client thread unwrapped, so that the -- consumer does not have to know anything about async `catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e @@ -101,9 +99,9 @@ withChainSyncEventStream socketPath networkId point consumer = do -- client will throw a NoIntersectionFound exception. chainSyncStreamingClient :: ChainPoint -> - TChan (ChainSyncEvent e) -> + TBQueue (ChainSyncEvent e) -> ChainSyncClient e ChainPoint ChainTip IO () -chainSyncStreamingClient point chan = +chainSyncStreamingClient point queue = ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect where onIntersect = @@ -123,10 +121,10 @@ chainSyncStreamingClient point chan = ClientStNext { recvMsgRollForward = \bim ct -> ChainSyncClient $ do - atomically $ writeTChan chan (RollForward bim ct) + atomically $ writeTBQueue queue (RollForward bim ct) sendRequestNext, recvMsgRollBackward = \cp ct -> ChainSyncClient $ do - atomically $ writeTChan chan (RollBackward cp ct) + atomically $ writeTBQueue queue (RollBackward cp ct) sendRequestNext }