diff --git a/cabal.project b/cabal.project index 74a929fb48..0599430dd7 100644 --- a/cabal.project +++ b/cabal.project @@ -16,6 +16,7 @@ packages: doc plutus-playground-server plutus-script-utils plutus-use-cases + plutus-streaming quickcheck-dynamic web-ghc diff --git a/plutus-streaming/README.md b/plutus-streaming/README.md new file mode 100644 index 0000000000..fab406ce3f --- /dev/null +++ b/plutus-streaming/README.md @@ -0,0 +1,42 @@ +# Plutus Streaming + +plutus-streaming is a simple library that wraps cardano-api giving a +streaming interface to the chain-sync protocol. This means you can use this +library to stream blocks from a locally running node. + +The blocks are presented as received from the chain-sync protocol so you +will still need to use function from `Cardano.Api` (in cardano-node) or +`Plutus.Ledger`(in plutus-ledger) to extract information from blocks. + +# Example applications + +## Example 1 + +This application simply streams to stdout events in JSON format. + +## Example 2 + +This application uses plutus-ledger to extract transactions (transaction +id, ins and outs) and datums. Then it prints them to stdout in JSON format. + +## Example 3 + +This application uses folds the stream into a UTxoState as defined in +plutus-chain-index-core. + +# Running the example applications + +You can run the example applications with cabal. Remember you need to have +a local node running and reachable through the provided socket. + +From Genesis + +``` +$ cabal run -- plutus-streaming-example-1 --socket-path /tmp/node.socket --mainnet +``` + +Passing a starting point + +``` +$ cabal run -- plutus-streaming-example-1 --socket-path /tmp/node.socket --slot-no 53427524 --block-hash 5e2bde4e504a9888a4f218dafc79a7619083f97d48684fcdba9dc78190df8f99 +``` diff --git a/plutus-streaming/examples/Common.hs b/plutus-streaming/examples/Common.hs new file mode 100644 index 0000000000..fda74c185f --- /dev/null +++ b/plutus-streaming/examples/Common.hs @@ -0,0 +1,77 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE RankNTypes #-} +module Common where + +import Cardano.Api qualified +import Data.Aeson qualified +import Data.ByteString.Lazy.Char8 qualified +import Options.Applicative (Alternative ((<|>)), Parser, auto, execParser, flag', help, helper, info, long, metavar, + option, str, strOption, (<**>)) +import Orphans () +import Streaming.Prelude qualified as S + +-- +-- Options parsing +-- + +data Options = Options + { optionsSocketPath :: String, + optionsNetworkId :: Cardano.Api.NetworkId, + optionsChainPoint :: Cardano.Api.ChainPoint + } + deriving (Show) + +optionsParser :: Parser Options +optionsParser = + Options + <$> strOption (long "socket-path" <> help "Node socket path") + <*> networkIdParser + <*> chainPointParser + +networkIdParser :: Parser Cardano.Api.NetworkId +networkIdParser = + pMainnet' <|> fmap Cardano.Api.Testnet testnetMagicParser + where + pMainnet' :: Parser Cardano.Api.NetworkId + pMainnet' = + flag' + Cardano.Api.Mainnet + ( long "mainnet" + <> help "Use the mainnet magic id." + ) + +testnetMagicParser :: Parser Cardano.Api.NetworkMagic +testnetMagicParser = + Cardano.Api.NetworkMagic + <$> option + auto + ( long "testnet-magic" + <> metavar "NATURAL" + <> help "Specify a testnet magic id." + ) + +chainPointParser :: Parser Cardano.Api.ChainPoint +chainPointParser = + pure Cardano.Api.ChainPointAtGenesis + <|> ( Cardano.Api.ChainPoint + <$> option (Cardano.Api.SlotNo <$> auto) (long "slot-no" <> metavar "SLOT-NO") + <*> option str (long "block-hash" <> metavar "BLOCK-HASH") + ) + +parseOptions :: IO Options +parseOptions = execParser $ info (optionsParser <**> helper) mempty + +printJson :: Data.Aeson.ToJSON a => S.Stream (S.Of a) IO r -> IO r +printJson = S.mapM_ Data.ByteString.Lazy.Char8.putStrLn . S.map Data.Aeson.encode + +-- https://github.com/input-output-hk/cardano-node/pull/3665 +workaround :: + (Cardano.Api.IsCardanoEra era => Cardano.Api.EraInMode era Cardano.Api.CardanoMode -> a) -> + Cardano.Api.EraInMode era Cardano.Api.CardanoMode -> + a +workaround k Cardano.Api.ByronEraInCardanoMode = k Cardano.Api.ByronEraInCardanoMode +workaround k Cardano.Api.ShelleyEraInCardanoMode = k Cardano.Api.ShelleyEraInCardanoMode +workaround k Cardano.Api.AllegraEraInCardanoMode = k Cardano.Api.AllegraEraInCardanoMode +workaround k Cardano.Api.MaryEraInCardanoMode = k Cardano.Api.MaryEraInCardanoMode +workaround k Cardano.Api.AlonzoEraInCardanoMode = k Cardano.Api.AlonzoEraInCardanoMode + diff --git a/plutus-streaming/examples/Example1.hs b/plutus-streaming/examples/Example1.hs new file mode 100644 index 0000000000..0a107eb4ad --- /dev/null +++ b/plutus-streaming/examples/Example1.hs @@ -0,0 +1,27 @@ +module Main where + +import Cardano.Api qualified +import Common (Options (Options, optionsChainPoint, optionsNetworkId, optionsSocketPath), parseOptions) +import Data.Aeson.Text qualified as Aeson +import Data.Text.Lazy qualified as TL +import Orphans () +import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), withChainSyncEventStream) +import Streaming.Prelude qualified as S + +-- +-- Main +-- + +main :: IO () +main = do + Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <- parseOptions + + withChainSyncEventStream optionsSocketPath optionsNetworkId optionsChainPoint $ + S.stdoutLn + . S.map + ( \case + RollForward (Cardano.Api.BlockInMode (Cardano.Api.Block header _txs) _era) _ct -> + "RollForward, header: " <> TL.unpack (Aeson.encodeToLazyText header) + RollBackward cp _ct -> + "RollBackward, point: " <> TL.unpack (Aeson.encodeToLazyText cp) + ) diff --git a/plutus-streaming/examples/Example2.hs b/plutus-streaming/examples/Example2.hs new file mode 100644 index 0000000000..7360687c64 --- /dev/null +++ b/plutus-streaming/examples/Example2.hs @@ -0,0 +1,56 @@ +module Main where + +import Cardano.Api qualified +import Common (Options (Options, optionsChainPoint, optionsNetworkId, optionsSocketPath), parseOptions, printJson, + workaround) +import Data.Foldable (toList) +import Ledger qualified +import Orphans () +import Plutus.Script.Utils.V1.Scripts qualified +import Plutus.Streaming (withChainSyncEventStream) +import Streaming.Prelude qualified as S + +-- +-- Accessory functions +-- + +transactions :: + Cardano.Api.BlockInMode Cardano.Api.CardanoMode -> + [Ledger.CardanoTx] +transactions (Cardano.Api.BlockInMode (Cardano.Api.Block _ txs) eim) = + map (\tx -> Ledger.CardanoApiTx (workaround (Ledger.SomeTx tx) eim)) txs + +txInsAndOuts :: + Ledger.CardanoTx -> + (Ledger.TxId, [Ledger.TxOutRef], [Ledger.TxOutRef]) +txInsAndOuts tx = (txId, inRefs, outRefs) + where + txId = Ledger.getCardanoTxId tx + inRefs = map Ledger.txInRef $ toList $ Ledger.getCardanoTxInputs tx + outRefs = map snd $ Ledger.getCardanoTxOutRefs tx + +datums :: + Ledger.CardanoTx -> + [(Plutus.Script.Utils.V1.Scripts.DatumHash, Plutus.Script.Utils.V1.Scripts.Datum)] +datums tx = do + let txIns = toList $ Ledger.getCardanoTxInputs tx + (Ledger.TxIn _ (Just (Ledger.ConsumeScriptAddress _validator _redeemer datum))) <- txIns + pure (Plutus.Script.Utils.V1.Scripts.datumHash datum, datum) + +-- +-- Main +-- + +main :: IO () +main = do + Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <- parseOptions + + withChainSyncEventStream optionsSocketPath optionsNetworkId optionsChainPoint $ + printJson + . S.map -- Each ChainSyncEvent + ( fmap -- Inside the payload of RollForward events + ( fmap -- Each transaction + (\tx -> (txInsAndOuts tx, datums tx)) + . transactions + ) + ) diff --git a/plutus-streaming/examples/Example3.hs b/plutus-streaming/examples/Example3.hs new file mode 100644 index 0000000000..f9bcb73e0f --- /dev/null +++ b/plutus-streaming/examples/Example3.hs @@ -0,0 +1,53 @@ +module Main where + +import Cardano.Api qualified +import Common (Options (Options, optionsChainPoint, optionsNetworkId, optionsSocketPath), parseOptions) +import Plutus.ChainIndex (TxUtxoBalance) +import Plutus.ChainIndex.Compatibility qualified as CI +import Plutus.ChainIndex.TxUtxoBalance qualified as TxUtxoBalance +import Plutus.ChainIndex.UtxoState (UtxoIndex, UtxoState) +import Plutus.ChainIndex.UtxoState qualified as UtxoState +import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), withChainSyncEventStream) +import Streaming (Of, Stream) +import Streaming.Prelude qualified as S + +utxoState :: + Monad m => + Stream (Of (ChainSyncEvent (Cardano.Api.BlockInMode Cardano.Api.CardanoMode))) m r -> + Stream (Of (ChainSyncEvent (Cardano.Api.BlockInMode Cardano.Api.CardanoMode), UtxoState TxUtxoBalance)) m r +utxoState = + S.scanned step initial projection + where + step index (RollForward block _) = + case CI.fromCardanoBlock block of + Left err -> error ("FromCardanoError: " <> show err) + Right txs -> + let tip = CI.tipFromCardanoBlock block + balance = TxUtxoBalance.fromBlock tip txs + in case UtxoState.insert balance index of + Left err -> + error (show err) + Right (UtxoState.InsertUtxoSuccess newIndex _insertPosition) -> + newIndex + step index (RollBackward cardanoPoint _) = + let point = CI.fromCardanoPoint cardanoPoint + in case TxUtxoBalance.rollback point index of + Left err -> error (show err) + Right (UtxoState.RollbackResult _newTip rolledBackIndex) -> + rolledBackIndex + + initial :: UtxoIndex TxUtxoBalance + initial = mempty + + projection = UtxoState.utxoState + +-- +-- Main +-- + +main :: IO () +main = do + Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <- parseOptions + + withChainSyncEventStream optionsSocketPath optionsNetworkId optionsChainPoint $ + S.print . utxoState diff --git a/plutus-streaming/examples/Orphans.hs b/plutus-streaming/examples/Orphans.hs new file mode 100644 index 0000000000..119bdd54e8 --- /dev/null +++ b/plutus-streaming/examples/Orphans.hs @@ -0,0 +1,38 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Orphans where + +import Cardano.Api (BlockHeader (BlockHeader), BlockNo, ChainPoint (ChainPoint, ChainPointAtGenesis), + HasTypeProxy (proxyToAsType), Hash, SerialiseAsRawBytes (deserialiseFromRawBytes), ToJSON) +import Data.ByteString.Base16 qualified as Base16 +import Data.ByteString.Char8 qualified as C8 +import Data.Proxy (Proxy (Proxy)) +import Data.String (IsString (fromString)) +import GHC.Generics (Generic) +import Plutus.Streaming (ChainSyncEvent) + +-- https://github.com/input-output-hk/cardano-node/pull/3608 +instance IsString (Hash BlockHeader) where + fromString = either error id . deserialiseFromRawBytesBase16 . C8.pack + where + deserialiseFromRawBytesBase16 str = + case Base16.decode str of + Right raw -> case deserialiseFromRawBytes ttoken raw of + Just x -> Right x + Nothing -> Left ("cannot deserialise " ++ show str) + Left msg -> Left ("invalid hex " ++ show str ++ ", " ++ msg) + where + ttoken = proxyToAsType (Proxy :: Proxy a) + +deriving instance Generic ChainPoint + +instance ToJSON ChainPoint + +instance ToJSON BlockNo + +deriving instance Generic BlockHeader + +instance ToJSON BlockHeader + +instance ToJSON a => ToJSON (ChainSyncEvent a) diff --git a/plutus-streaming/plutus-streaming.cabal b/plutus-streaming/plutus-streaming.cabal new file mode 100644 index 0000000000..52ac8d8922 --- /dev/null +++ b/plutus-streaming/plutus-streaming.cabal @@ -0,0 +1,93 @@ +cabal-version: 2.4 +name: plutus-streaming +version: 0.1.0.0 +author: Andrea Bedini +maintainer: andrea.bedini@iohk.io +extra-source-files: CHANGELOG.md + +common lang + default-language: Haskell2010 + default-extensions: + DeriveFoldable + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + ExplicitForAll + GeneralizedNewtypeDeriving + ImportQualifiedPost + LambdaCase + NamedFieldPuns + ScopedTypeVariables + StandaloneDeriving + ghc-options: + -Wall + -Widentities + -Wincomplete-record-updates + -Wincomplete-uni-patterns + -Wmissing-import-lists + -Wnoncanonical-monad-instances + -Wredundant-constraints + -Wunused-packages + +library + import: lang + hs-source-dirs: src + exposed-modules: + Plutus.Streaming + build-depends: + base >=4.9 && <5, + async, + cardano-api, + ouroboros-network, + stm, + streaming, + +executable plutus-streaming-example-1 + import: lang + hs-source-dirs: examples + main-is: Example1.hs + other-modules: Common, Orphans + build-depends: + plutus-streaming, + base >=4.9 && <5, + aeson, + base16-bytestring, + bytestring, + cardano-api, + optparse-applicative, + streaming, + text + +executable plutus-streaming-example-2 + import: lang + hs-source-dirs: examples + main-is: Example2.hs + other-modules: Common, Orphans + build-depends: + plutus-streaming, + base >=4.9 && <5, + aeson, + base16-bytestring, + bytestring, + cardano-api, + optparse-applicative, + plutus-ledger, + plutus-script-utils, + streaming, + +executable plutus-streaming-example-3 + import: lang + hs-source-dirs: examples + main-is: Example3.hs + other-modules: Common, Orphans + build-depends: + plutus-streaming, + base >=4.9 && <5, + aeson, + base16-bytestring, + bytestring, + cardano-api, + optparse-applicative, + plutus-chain-index-core, + streaming, diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs new file mode 100644 index 0000000000..6acaa6eaa7 --- /dev/null +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -0,0 +1,129 @@ +module Plutus.Streaming + ( withChainSyncEventStream, + ChainSyncEvent (..), + ChainSyncEventException (..), + ) +where + +import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (ChainSyncClient), ChainTip, + ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots), + LocalChainSyncClient (LocalChainSyncClient), + LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxSubmissionClient), + LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath), + NetworkId, connectToLocalNode) +import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext), + ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound), + ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward)) +import Control.Concurrent.Async (link, withAsync) +import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) +import Control.Exception (Exception, throw) +import GHC.Generics (Generic) +import Streaming (Of, Stream) +import Streaming.Prelude qualified as S + +data ChainSyncEvent a + = RollForward a ChainTip + | RollBackward ChainPoint ChainTip + deriving (Show, Functor, Generic) + +data ChainSyncEventException + = NoIntersectionFound + deriving (Show) + +instance Exception ChainSyncEventException + +-- | `withChainSyncEventStream` uses the chain-sync mini-protocol to +-- connect to a locally running node and fetch blocks from the given +-- starting point. +withChainSyncEventStream :: + -- | Path to the node socket + FilePath -> + NetworkId -> + -- | The point on the chain to start streaming from + ChainPoint -> + -- | Stream consumer + (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) -> + IO b +withChainSyncEventStream socketPath networkId point consumer = do + -- The chain-sync client runs in a different thread and it will send us + -- block through this channel. + + -- By using newBroadcastTChan, messages can be garbage collected after + -- clients have seen them, preventing pile up. The only way to read a + -- broadcast channel is to duplicate it with dupTChan. See note at + -- `newBroadcastTChan`. + chan <- newBroadcastTChanIO + readerChannel <- atomically $ dupTChan chan + + let client = chainSyncStreamingClient point chan + + localNodeClientProtocols = + LocalNodeClientProtocols + { localChainSyncClient = LocalChainSyncClient client, + localTxSubmissionClient = Nothing, + localStateQueryClient = Nothing + } + + connectInfo = + LocalNodeConnectInfo + { localConsensusModeParams = CardanoModeParams epochSlots, + localNodeNetworkId = networkId, + localNodeSocketPath = socketPath + } + + -- FIXME this comes from the config file but Cardano.Api does not expose readNetworkConfig! + epochSlots = EpochSlots 40 + + clientThread = do + connectToLocalNode connectInfo localNodeClientProtocols + -- the only reason connectToLocalNode can terminate successfully is if it + -- doesn't find an intersection, we report that case to the + -- consumer as an exception + throw NoIntersectionFound + + withAsync clientThread $ \a -> do + -- Make sure all exceptions in the client thread are passed to the consumer thread + link a + -- Run the consumer + consumer $ S.repeatM $ atomically (readTChan readerChannel) + +-- | `chainSyncStreamingClient` is the client that connects to a local node +-- and runs the chain-sync mini-protocol. This client is fire-and-forget +-- and does not require any control. +-- +-- Blocks obtained from the chain-sync mini-protocol are passed to a +-- consumer through a channel. +-- +-- If the starting point is such that an intersection cannot be found, this +-- client will throw a NoIntersectionFound exception. +chainSyncStreamingClient :: + ChainPoint -> + TChan (ChainSyncEvent e) -> + ChainSyncClient e ChainPoint ChainTip IO () +chainSyncStreamingClient point chan = + ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect + where + onIntersect = + ClientStIntersect + { recvMsgIntersectFound = \_ _ -> + ChainSyncClient sendRequestNext, + recvMsgIntersectNotFound = \_ -> + ChainSyncClient $ + -- There is nothing we can do here + throw NoIntersectionFound + } + + sendRequestNext = + pure $ SendMsgRequestNext onNext (pure onNext) + where + onNext = + ClientStNext + { recvMsgRollForward = \bim ct -> + ChainSyncClient $ do + atomically $ writeTChan chan (RollForward bim ct) + sendRequestNext, + recvMsgRollBackward = \cp ct -> + ChainSyncClient $ do + atomically $ writeTChan chan (RollBackward cp ct) + sendRequestNext + }