diff --git a/cardano-wallet.cabal b/cardano-wallet.cabal index 07b2e79a508..20b7ee5fa7f 100644 --- a/cardano-wallet.cabal +++ b/cardano-wallet.cabal @@ -45,6 +45,7 @@ library , servant , servant-client , text + , time-units , transformers hs-source-dirs: src @@ -53,6 +54,7 @@ library , Cardano.ChainProducer.RustHttpBridge.Client , Cardano.Wallet.Binary , Cardano.Wallet.Binary.Packfile + , Cardano.Wallet.BlockSyncer , Cardano.Wallet.Primitive , Servant.Extra.ContentTypes other-modules: @@ -98,7 +100,9 @@ test-suite unit , containers , hspec , memory + , hspec-expectations , QuickCheck + , time-units type: exitcode-stdio-1.0 hs-source-dirs: @@ -107,5 +111,6 @@ test-suite unit Main.hs other-modules: Cardano.Wallet.BinarySpec - Cardano.Wallet.Binary.PackfileSpec - Cardano.Wallet.PrimitiveSpec + , Cardano.Wallet.Binary.PackfileSpec + , Cardano.Wallet.PrimitiveSpec + , Cardano.Wallet.BlockSyncerSpec diff --git a/src/Cardano/Wallet/BlockSyncer.hs b/src/Cardano/Wallet/BlockSyncer.hs new file mode 100644 index 00000000000..93cef0c3e92 --- /dev/null +++ b/src/Cardano/Wallet/BlockSyncer.hs @@ -0,0 +1,69 @@ +{-# LANGUAGE ScopedTypeVariables #-} + +-- | +-- Copyright: © 2018-2019 IOHK +-- License: MIT +-- +-- This module contains the ticking function that is responsible for invoking +-- block acquisition functionality and executing it in periodic fashion. +-- +-- Known limitations: the ticking function makes sure action is not executed on +-- already consumed block, but does not check and handle block gaps (aka +-- catching up). + +module Cardano.Wallet.BlockSyncer + ( + BlockHeadersConsumed(..) + , tickingFunction + ) where + + +import Prelude + +import Cardano.Wallet.Primitive + ( Block (..), BlockHeader ) +import Control.Concurrent + ( threadDelay ) +import Data.Time.Units + ( Millisecond, toMicroseconds ) + +import qualified Data.List as L + + +newtype BlockHeadersConsumed = + BlockHeadersConsumed [BlockHeader] + deriving (Show, Eq) + +storingLimit :: Int +storingLimit = 2160 + +tickingFunction + :: IO [Block] + -- ^ a way to get a new block + -> (Block -> IO ()) + -- ^ action taken on a new block + -> Millisecond + -- ^ tick time + -> BlockHeadersConsumed + -> IO () +tickingFunction getNextBlocks action tickTime = go + where + go + :: BlockHeadersConsumed + -> IO () + go (BlockHeadersConsumed headersConsumed) = do + blocksDownloaded <- getNextBlocks + let blocksToProcess = + filter (checkIfAlreadyConsumed headersConsumed) (L.nub blocksDownloaded) + mapM_ action blocksToProcess + threadDelay $ (fromIntegral . toMicroseconds) tickTime + go $ BlockHeadersConsumed + $ take storingLimit + $ map header blocksToProcess ++ headersConsumed + + checkIfAlreadyConsumed + :: [BlockHeader] + -> Block + -> Bool + checkIfAlreadyConsumed consumedHeaders (Block theHeader _) = + theHeader `L.notElem` consumedHeaders diff --git a/src/Cardano/Wallet/Primitive.hs b/src/Cardano/Wallet/Primitive.hs index 39ef2f96927..bd273800610 100644 --- a/src/Cardano/Wallet/Primitive.hs +++ b/src/Cardano/Wallet/Primitive.hs @@ -88,13 +88,13 @@ import qualified Data.Set as Set newtype EpochId = EpochId { getEpochId :: Word64 - } deriving (Eq, Generic, NFData, Num, Show) + } deriving (Eq, Generic, NFData, Num, Ord, Show) -- * Slot newtype SlotId = SlotId { getSlotId :: Word16 - } deriving (Eq, Generic, NFData, Num, Show) + } deriving (Eq, Generic, NFData, Num, Ord, Show) -- * Block @@ -103,7 +103,7 @@ data Block = Block :: !BlockHeader , transactions :: !(Set Tx) - } deriving (Show, Eq, Generic) + } deriving (Show, Eq, Ord, Generic) instance NFData Block @@ -114,7 +114,7 @@ data BlockHeader = BlockHeader :: !SlotId , prevBlockHash :: !(Hash "BlockHeader") - } deriving (Show, Eq, Generic) + } deriving (Show, Eq, Ord, Generic) instance NFData BlockHeader diff --git a/stack.yaml b/stack.yaml index 806baf9acff..234deb6e4d8 100644 --- a/stack.yaml +++ b/stack.yaml @@ -5,6 +5,7 @@ packages: extra-deps: - base58-bytestring-0.1.0 - generic-lens-1.1.0.0 +- time-units-1.0.0 - git: https://github.com/input-output-hk/cardano-crypto commit: 3c5db489c71a4d70ee43f5f9b979fcde3c797f2a diff --git a/test/unit/Cardano/Wallet/BlockSyncerSpec.hs b/test/unit/Cardano/Wallet/BlockSyncerSpec.hs new file mode 100644 index 00000000000..cc50e32a806 --- /dev/null +++ b/test/unit/Cardano/Wallet/BlockSyncerSpec.hs @@ -0,0 +1,191 @@ +{-# OPTIONS_GHC -fno-warn-unused-imports #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} + +module Cardano.Wallet.BlockSyncerSpec + ( spec + , groups + , duplicateMaybe + ) where + + +import Prelude + +import Cardano.Wallet.BlockSyncer + ( BlockHeadersConsumed (..), tickingFunction ) +import Cardano.Wallet.Primitive + ( Block (..), BlockHeader (..), EpochId (..), Hash (..), SlotId (..) ) +import Control.Concurrent + ( ThreadId, forkIO, killThread, threadDelay ) +import Control.Concurrent.MVar + ( MVar, modifyMVar_, newEmptyMVar, newMVar, putMVar, takeMVar ) +import Control.Monad + ( foldM, forM_, (>=>) ) +import Control.Monad.IO.Class + ( liftIO ) +import Data.ByteString + ( ByteString, pack ) +import Data.Functor + ( ($>) ) +import Data.Map.Strict + ( Map ) +import Data.Time.Units + ( Millisecond, fromMicroseconds ) +import Data.Tuple + ( swap ) +import Test.Hspec + ( Arg, Spec, SpecWith, describe, it, shouldReturn ) +import Test.QuickCheck + ( Arbitrary (..) + , Property + , elements + , generate + , property + , vector + , withMaxSuccess + ) +import Test.QuickCheck.Gen + ( Gen, choose, vectorOf ) +import Test.QuickCheck.Monadic + ( monadicIO ) + +import qualified Codec.CBOR.Encoding as CBOR +import qualified Codec.CBOR.Write as CBOR +import qualified Data.List as L +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set + + +spec :: Spec +spec = do + describe "Block syncer downloads blocks properly" $ do + it "Check ticking function when blocks are sent" + (withMaxSuccess 10 $ property tickingFunctionTest) + + +{------------------------------------------------------------------------------- + Test Logic +-------------------------------------------------------------------------------} + +tickingFunctionTest + :: (TickingTime, Blocks) + -> Property +tickingFunctionTest (TickingTime tickTime, Blocks blocks) = monadicIO $ liftIO $ do + (readerChan, reader) <- mkReader + (writerChan, writer) <- mkWriter blocks + waitFor writerChan $ tickingFunction writer reader tickTime (BlockHeadersConsumed []) + takeMVar readerChan `shouldReturn` L.nub (reverse $ mconcat blocks) + +waitFor + :: MVar () + -> IO () + -> IO () +waitFor done action = do + threadId <- forkIO action + _ <- takeMVar done + killThread threadId + +mkWriter + :: [[a]] + -> IO (MVar (), IO [a]) +mkWriter xs0 = do + ref <- newMVar xs0 + done <- newEmptyMVar + return + ( done + , takeMVar ref >>= \case + [] -> putMVar done () $> [] + h:q -> putMVar ref q $> h + ) + +mkReader + :: IO (MVar [a], a -> IO ()) +mkReader = do + ref <- newMVar [] + return + ( ref + , \x -> modifyMVar_ ref $ return . (x :) + ) + +{------------------------------------------------------------------------------- + Arbitrary Instances +-------------------------------------------------------------------------------} + + +newtype TickingTime = TickingTime Millisecond + deriving (Show) + +instance Arbitrary TickingTime where + -- No shrinking + arbitrary = do + tickTime <- fromMicroseconds . (* 1000) <$> choose (50, 100) + return $ TickingTime tickTime + + +newtype Blocks = Blocks [[Block]] + deriving Show + +instance Arbitrary Blocks where + -- No Shrinking + arbitrary = do + n <- arbitrary + let h0 = BlockHeader 1 0 (Hash "initial block") + let blocks = map snd $ take n $ iterate next + ( blockHeaderHash h0 + , Block h0 mempty + ) + mapM duplicateMaybe blocks >>= fmap Blocks . groups . mconcat + where + next :: (Hash "BlockHeader", Block) -> (Hash "BlockHeader", Block) + next (prev, b) = + let + epoch = epochIndex (header b) + slot = slotNumber (header b) + 1 + h = BlockHeader epoch slot prev + in + (blockHeaderHash h, Block h mempty) + + blockHeaderHash :: BlockHeader -> Hash "BlockHeader" + blockHeaderHash = + Hash . CBOR.toStrictByteString . encodeBlockHeader + where + encodeBlockHeader (BlockHeader (EpochId epoch) (SlotId slot) prev) = mempty + <> CBOR.encodeListLen 3 + <> CBOR.encodeWord64 epoch + <> CBOR.encodeWord16 slot + <> CBOR.encodeBytes (getHash prev) + + +-- | Construct arbitrary groups of elements from a given list. +-- +-- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9] +-- [[0,1],[2,3],[4,5,6],[7,8,9]] +-- +-- +-- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9] +-- [[],[0],[1,2,3,4,5,6,7,8],[9]] +-- +groups :: [a] -> Gen [[a]] +groups = fmap reverse . foldM arbitraryGroup [[]] + where + arbitraryGroup :: [[a]] -> a -> Gen [[a]] + arbitraryGroup [] _ = return [] -- Can't happen with the given initial value + arbitraryGroup (grp:rest) a = do + choose (1 :: Int, 3) >>= \case + 1 -> return $ [a]:grp:rest + _ -> return $ (grp ++ [a]):rest + +-- | Generate a singleton or a pair from a given element. +-- +-- >>> generate $ duplicateMaybe 14 +-- [14] +-- +-- >>> generate $ duplicateMaybe 14 +-- [14, 14] +-- +duplicateMaybe :: a -> Gen [a] +duplicateMaybe a = do + predicate <- arbitrary + if predicate then return [a, a] else return [a]