Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill command diagnostics #151

Merged
merged 26 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5c5a6c9
include possible fill error log file
emmanueldenloye Jun 5, 2023
504f86b
log errors and write miscounts to error log file
emmanueldenloye Jun 7, 2023
b0127f2
use S.logg from System.Logger
emmanueldenloye Jun 9, 2023
68b5acd
show chunk range
emmanueldenloye Jun 13, 2023
a68e695
create a separate log file for each chain
emmanueldenloye Jun 23, 2023
c7f3deb
make the appropriate changes everywhere for the error log file
emmanueldenloye Jun 23, 2023
f21435e
track missing gaps and print them at the end of the gaps run
emmanueldenloye Jun 29, 2023
bac603b
catch possible exception from headersBetween function
emmanueldenloye Jul 5, 2023
8b30364
add simple test of headersBetween
emmanueldenloye Jul 6, 2023
c00f38c
add simple test of headersBetween pt2
emmanueldenloye Jul 7, 2023
b6ccc88
use the service and not the p2p api for getting headers
emmanueldenloye Jul 7, 2023
ef084f4
make sure payloadWithOutputsBatch call uses service API
emmanueldenloye Jul 10, 2023
5274a35
add test to get gaps, fix getBlockGaps function
emmanueldenloye Jul 13, 2023
c391dcf
cleanup
emmanueldenloye Jul 13, 2023
aaf782e
hide db secrets
emmanueldenloye Jul 14, 2023
28e38aa
Update haskell-src/exec/Chainweb/Worker.hs
emmanueldenloye Jul 14, 2023
ad07c66
Merge branch 'emmanuel/fill-command-diagnostics' of github.com:kadena…
emmanueldenloye Jul 14, 2023
f437160
fix up writeBlocks function
emmanueldenloye Jul 14, 2023
7da6b0f
properly return the number of rows inserted in error log file
emmanueldenloye Jul 14, 2023
cf216fd
Update haskell-src/exec/Chainweb/Worker.hs
emmanueldenloye Jul 15, 2023
25360c1
Update haskell-src/exec/Chainweb/Gaps.hs
emmanueldenloye Jul 17, 2023
89697a2
fix previous suggestion
emmanueldenloye Jul 17, 2023
b74e94d
take out error logging
emmanueldenloye Jul 17, 2023
86d0425
Update haskell-src/exec/Chainweb/Server.hs
emmanueldenloye Jul 18, 2023
c0685f1
Update haskell-src/exec/Chainweb/Worker.hs
emmanueldenloye Jul 18, 2023
c3d928f
Update haskell-src/exec/Chainweb/Worker.hs
emmanueldenloye Jul 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 110 additions & 5 deletions haskell-src/exec/Chainweb/Gaps.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Chainweb.Gaps ( gaps ) where
module Chainweb.Gaps ( gaps, _test_headersBetween_and_payloadBatch ) where

import Chainweb.Api.ChainId (ChainId(..))
import Chainweb.Api.NodeInfo
import Chainweb.Api.BlockHeader
import ChainwebDb.Database
import ChainwebData.Env
import Chainweb.Lookups
Expand All @@ -14,6 +15,7 @@ import ChainwebData.Genesis
import ChainwebData.Types
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception (catch, SomeException(..))
import Control.Monad
import Control.Scheduler
import Data.Bool
Expand All @@ -23,9 +25,14 @@ import Data.Int
import qualified Data.Map.Strict as M
import Data.String
import Data.Text (Text)
import Data.Word (Word16)
import Database.Beam hiding (insert)
import Database.Beam.Postgres
import Network.Connection (TLSSettings(TLSSettingsSimple))
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import System.Logger hiding (logg)
import qualified System.Logger as S
import System.Exit (exitFailure)
import Text.Printf

Expand Down Expand Up @@ -65,7 +72,9 @@ gapsCut env args cutBS = do
race_ (progress logg count totalNumBlocks)
(traverseConcurrently_ Par' doChain (M.toList gapsByChain))
final <- readIORef count
logg Info $ fromString $ printf "Filled in %d missing blocks." final
logg Info $ fromString
$ printf "Filled in %d missing blocks" final
++ if (totalNumBlocks > final) then (printf ", %d fewer than detected gaps" (totalNumBlocks - final)) else mempty
gapFiller
where
pool = _env_dbConnPool env
Expand All @@ -80,12 +89,108 @@ gapsCut env args cutBS = do
f :: LogFunctionIO Text -> IORef Int -> Int64 -> (Low, High) -> IO ()
f logger count cid (l, h) = do
let range = (ChainId (fromIntegral cid), l, h)
headersBetween env range >>= \case
let onCatch (e :: SomeException) = do
logger Error $
fromString $ printf "Caught exception from headersBetween for range %s: %s" (show range) (show e)
pure $ Right []
(headersBetween env range `catch` onCatch) >>= \case
Left e -> logger Error $ fromString $ printf "ApiError for range %s: %s" (show range) (show e)
Right [] -> logger Error $ fromString $ printf "headersBetween: %s" $ show range
Right hs -> writeBlocks env pool count hs
maybe mempty threadDelay delay

_test_headersBetween_and_payloadBatch :: IO ()
_test_headersBetween_and_payloadBatch = do
env <- testEnv
queryCut env >>= print
let ranges = rangeToDescGroupsOf blockHeaderRequestSize (Low 3817591) (High 3819591)
toRange c (Low l, High h) = (c, Low l, High h)
onCatch (e :: SomeException) = do
putStrLn $ "Caught exception from headersBetween: " <> show e
pure $ Right []
forM_ (take 1 ranges) $ \range -> (headersBetween env (toRange (ChainId 0) range) `catch` onCatch) >>= \case
Left e -> putStrLn $ "Error: " <> show e
Right hs -> do
putStrLn $ "Got " <> show (length hs) <> " headers"
let makeMap = M.fromList . map (\bh -> (hashToDbHash $ _blockHeader_payloadHash bh, _blockHeader_hash bh))
payloadWithOutputsBatch env (ChainId 0) (makeMap hs) id >>= \case
Left e -> putStrLn $ "Error: " <> show e
Right pls -> do
putStrLn $ "Got " <> show (length pls) <> " payloads"
where
testEnv :: IO Env
testEnv = do
manager <- newManager $ mkManagerSettings (TLSSettingsSimple True False False) Nothing
let urlHost_ = "localhost"
serviceUrlScheme = UrlScheme Http $ Url urlHost_ 1848
p2pUrl = Url urlHost_ 443
nodeInfo = NodeInfo
{
_nodeInfo_chainwebVer = "mainnet01"
, _nodeInfo_apiVer = undefined
, _nodeInfo_chains = undefined
, _nodeInfo_numChains = undefined
, _nodeInfo_graphs = Nothing
}
return Env -- these undefined fields are not used in the `headersBetween` function
{
_env_httpManager = manager
, _env_dbConnPool = undefined
, _env_serviceUrlScheme = serviceUrlScheme
, _env_p2pUrl = p2pUrl
, _env_nodeInfo = nodeInfo
, _env_chainsAtHeight = undefined
, _env_logger = undefined
}

_test_getBlockGaps
:: String -- host
-> Word16 -- port
-> String -- user
-> String -- password
-> String -- db name
-> IO ()
_test_getBlockGaps dbHost dbPort dbUser password dbName = withHandleBackend defaultHandleBackendConfig $ \backend ->
withLogger defaultLoggerConfig backend $ \logger -> do
let l = loggerFunIO logger
manager <- newManager $ mkManagerSettings (TLSSettingsSimple True False False) Nothing
let serviceUrlScheme = UrlScheme Http $ Url "localhost" 1848
getNodeInfo manager serviceUrlScheme >>= \case
Left _err -> l Error "Error getting node info"
Right ni -> do
let pgc = PGInfo $ ConnectInfo
{
connectHost = dbHost
, connectPort = dbPort
, connectUser = dbUser
, connectPassword = password
, connectDatabase = dbName
}

withCWDPool pgc $ \pool -> do
env <- getEnv ni l manager pool
let fromRight = either (error "fromRight: hit left") id
cutBS <- fromRight <$> queryCut env
minHeights <- getAndVerifyMinHeights env cutBS
gapsByChain <- getBlockGaps env minHeights
print gapsByChain
where
second f (a,b) = (a, f b)
fromMaybe b = \case
Just a -> a
Nothing -> b
getEnv ni lr m pool =
return Env
{
_env_httpManager = m
, _env_dbConnPool = pool
, _env_serviceUrlScheme = UrlScheme Http $ Url "localhost" 1848
, _env_p2pUrl = Url "localhost" 443
, _env_nodeInfo = ni
, _env_chainsAtHeight = fromMaybe (error "chainsAtMinHeight missing") $ map (second (map (ChainId . fst))) <$> (_nodeInfo_graphs ni)
, _env_logger = lr
}

getBlockGaps :: Env -> M.Map Int64 (Maybe Int64) -> IO (M.Map Int64 [(Int64,Int64)])
getBlockGaps env existingMinHeights = withDbDebug env Debug $ do
let toMap = M.fromListWith (<>) . map (\(cid,a,b) -> (cid,[(a,b)]))
Expand All @@ -111,8 +216,8 @@ getBlockGaps env existingMinHeights = withDbDebug env Debug $ do
maybeAppendGenesis mMin genesisheight =
case mMin of
Just min' -> case compare genesisheight min' of
GT -> Nothing
_ -> Just (genesisheight, min')
LT -> Just (genesisheight, min')
_ -> Nothing
Nothing -> Nothing
addStart mr xs = case mr of
Nothing -> xs
Expand Down
4 changes: 2 additions & 2 deletions haskell-src/exec/Chainweb/Lookups.hs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ headersBetween env (cid, Low low, High up) = do
pure $ (^.. key "items" . values . _String . to f . _Just) . responseBody <$> eresp
where
v = _nodeInfo_chainwebVer $ _env_nodeInfo env
url = showUrlScheme (UrlScheme Https $ _env_p2pUrl env) <> query
url = showUrlScheme (_env_serviceUrlScheme env) <> query
query = printf "/chainweb/0.0/%s/chain/%d/header?minheight=%d&maxheight=%d"
(T.unpack v) (unChainId cid) low up
encoding = [("accept", "application/json")]
Expand All @@ -141,7 +141,7 @@ payloadWithOutputsBatch env (ChainId cid) m _f = do
pure res
where
rest = T.pack $ "\nHashes: ( " ++ (L.intercalate " " $ M.elems (show . hashB64U . _f <$> m)) ++ " )"
url = showUrlScheme (UrlScheme Https $ _env_p2pUrl env) <> T.unpack query
url = showUrlScheme (_env_serviceUrlScheme env) <> T.unpack query
v = _nodeInfo_chainwebVer $ _env_nodeInfo env
query = "/chainweb/0.0/" <> v <> "/chain/" <> T.pack (show cid) <> "/payload/outputs/batch"
encoding = [("content-type", "application/json")]
Expand Down
4 changes: 2 additions & 2 deletions haskell-src/exec/Chainweb/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ scheduledUpdates env pool runFill fillDelay = forever $ do

when runFill $ do
logg Info "Filling missing blocks"
gaps env (FillArgs fillDelay)
gaps env (FillArgs fillDelay) -- TODO: This shouldn't be Nothing
emmanueldenloye marked this conversation as resolved.
Show resolved Hide resolved
logg Info "Fill finished"
where
micros = 1000000
Expand Down Expand Up @@ -381,7 +381,7 @@ toApiTxDetail tx contHist blk evs = TxDetail

getMaxBlockHeight :: LogFunctionIO Text -> Connection -> IO (Maybe BlockHeight)
getMaxBlockHeight logger c =
runBeamPostgresDebug (logger Debug . T.pack) c $
runBeamPostgresDebug (logger Debug . T.pack) c $
fmap f $ runSelectReturningOne $ select $ do
blk <- all_ (_cddb_blocks database)
return $ max_ (_block_height blk)
Expand Down
26 changes: 21 additions & 5 deletions haskell-src/exec/Chainweb/Worker.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE NumericUnderscores #-}
Expand Down Expand Up @@ -82,16 +83,28 @@ writes pool b ks ts es ss tf = P.withResource pool $ \c -> withTransaction c $ d
-- (unDbHash $ _block_hash b)
-- (map (const '.') ts)

batchWrites :: P.Pool Connection -> [Block] -> [[T.Text]] -> [[Transaction]] -> [[Event]] -> [[Signer]] -> [[Transfer]] -> IO ()
batchWrites
:: P.Pool Connection
-> [Block]
-> [[T.Text]]
-> [[Transaction]]
-> [[Event]]
-> [[Signer]]
-> [[Transfer]]
-> IO ()
batchWrites pool bs kss tss ess sss tfs = P.withResource pool $ \c -> withTransaction c $ do

runBeamPostgres c $ do
-- Write the Blocks if unique
runInsert
$ insert (_cddb_blocks database) (insertValues bs)
$ onConflict (conflictingFields primaryKey) onConflictDoNothing
-- Write Pub Key many-to-many relationships if unique --

let mks = concat $ zipWith (\b ks -> map (MinerKey (pk b)) ks) bs kss

runInsert
$ insert (_cddb_minerkeys database) (insertValues $ concat $ zipWith (\b ks -> map (MinerKey (pk b)) ks) bs kss)
$ insert (_cddb_minerkeys database) (insertValues mks)
$ onConflict (conflictingFields primaryKey) onConflictDoNothing

withSavepoint c $ do
Expand All @@ -104,9 +117,11 @@ batchWrites pool bs kss tss ess sss tfs = P.withResource pool $ \c -> withTransa
runInsert
$ insert (_cddb_events database) (insertValues $ concat ess)
$ onConflict (conflictingFields primaryKey) onConflictDoNothing

runInsert
$ insert (_cddb_signers database) (insertValues $ concat sss)
$ onConflict (conflictingFields primaryKey) onConflictDoNothing

runInsert
$ insert (_cddb_transfers database) (insertValues $ concat tfs)
$ onConflict (conflictingFields primaryKey) onConflictDoNothing
Expand Down Expand Up @@ -149,8 +164,8 @@ writeBlocks env pool count bhs = do
let ff bh = (hashToDbHash $ _blockHeader_payloadHash bh, _blockHeader_hash bh)
retrying policy check (const $ payloadWithOutputsBatch env chain (M.fromList (ff <$> bhs')) id) >>= \case
Left e -> do
logger Error $ fromString $ printf "Couldn't fetch payload batch for chain: %d" (unChainId chain)
logger Error $ fromString $ show e
logger Error $ fromString $ printf "Couldn't fetch payload batch for chain: %d" (unChainId chain)
logger Error $ fromString $ show e
emmanueldenloye marked this conversation as resolved.
Show resolved Hide resolved
Right pls' -> do
let !pls = M.fromList pls'
!ms = _blockPayloadWithOutputs_minerData <$> pls
Expand All @@ -166,7 +181,8 @@ writeBlocks env pool count bhs = do
err = printf "writeBlocks failed because we don't know how to work this version %s" version
withEventsMinHeight version err $ \evMinHeight -> do
let !tfs = M.intersectionWith (\pl bh -> mkTransferRows (fromIntegral $ _blockHeader_height bh) (_blockHeader_chainId bh) (DbHash $ hashB64U $ _blockHeader_hash bh) (posixSecondsToUTCTime $ _blockHeader_creationTime bh) pl evMinHeight) pls (makeBlockMap bhs')
batchWrites pool (M.elems bs) (M.elems kss) (M.elems tss) (M.elems ess) (M.elems sss) (M.elems tfs)
batchWrites pool (M.elems bs)
(M.elems kss) (M.elems tss) (M.elems ess) (M.elems sss) (M.elems tfs)
emmanueldenloye marked this conversation as resolved.
Show resolved Hide resolved
atomicModifyIORef' count (\n -> (n + numWrites, ()))
where

Expand Down
Loading