Skip to content

Commit

Permalink
Use a bounded channel inside withChainSyncEventStream
Browse files Browse the repository at this point in the history
Using a unbounded channel can cause memory issues with the consumer is
slower than the producer. Since the producer is a locally connected
node, this is most likely the case.
  • Loading branch information
andreabedini committed Jun 9, 2022
1 parent 6e560c8 commit d132917
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions plutus-streaming/src/Plutus/Streaming.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsg
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
Expand Down Expand Up @@ -48,14 +49,11 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.

-- By using newBroadcastTChan, messages can be garbage collected after
-- clients have seen them, preventing pile up. The only way to read a
-- broadcast channel is to duplicate it with dupTChan. See note at
-- `newBroadcastTChan`.
chan <- newBroadcastTChanIO
readerChannel <- atomically $ dupTChan chan
-- On local testing we do about 1000 events per second next to a local
-- node. One second of buffer sounds reasonable to me.
queue <- newTBQueueIO 1000

let client = chainSyncStreamingClient point chan
let client = chainSyncStreamingClient point queue

localNodeClientProtocols =
LocalNodeClientProtocols
Expand Down Expand Up @@ -85,7 +83,7 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- Make sure all exceptions in the client thread are passed to the consumer thread
link a
-- Run the consumer
consumer $ S.repeatM $ atomically (readTChan readerChannel)
consumer $ S.repeatM $ atomically (readTBQueue queue)
-- Let's rethrow exceptions from the client thread unwrapped, so that the
-- consumer does not have to know anything about async
`catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e
Expand All @@ -101,9 +99,9 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
ChainPoint ->
TChan (ChainSyncEvent e) ->
TBQueue (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point chan =
chainSyncStreamingClient point queue =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
where
onIntersect =
Expand All @@ -123,10 +121,10 @@ chainSyncStreamingClient point chan =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollForward bim ct)
atomically $ writeTBQueue queue (RollForward bim ct)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollBackward cp ct)
atomically $ writeTBQueue queue (RollBackward cp ct)
sendRequestNext
}

0 comments on commit d132917

Please sign in to comment.