Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[inferno-vc] Cached client and server logging. More granular cache locks. Cleanup #143

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions inferno-vc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Revision History for inferno-vc
*Note*: we use https://pvp.haskell.org/ (MAJOR.MAJOR.MINOR.PATCH)

## 0.3.8.0 -- 2024-10-16
* Added logging to cached client to see hits and misses
* Added logging to server to see what scriptIds are being used to request
fetchObjects and fetchObjectClosureHashes
* Made locks on cache more granular and only fetch a single upstream object per
request

## 0.3.7.1 -- 2024-09-23
* Fix overflowing threadDelay on armv7l

Expand Down
3 changes: 2 additions & 1 deletion inferno-vc/inferno-vc.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: >=1.10
name: inferno-vc
version: 0.3.7.1
version: 0.3.8.0
synopsis: Version control server for Inferno
description: A version control server for Inferno scripts
category: DSL,Scripting
Expand Down Expand Up @@ -69,6 +69,7 @@ library
, QuickCheck
, stm
, unbounded-delays
, random-shuffle

default-language: Haskell2010
default-extensions:
Expand Down
213 changes: 137 additions & 76 deletions inferno-vc/src/Inferno/VersionControl/Client/Cached.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import Control.Concurrent.STM
retry,
writeTVar,
)
import Control.Monad (forM, forM_)
import Control.Monad.Catch (MonadMask, bracket_)
import Control.Monad (forM, forM_, guard)
import Control.Monad.Catch (MonadMask, bracket_, tryJust)
import Control.Monad.Error.Lens (throwing)
import Control.Monad.Except (MonadError (..))
import Control.Monad.IO.Class (MonadIO (..))
Expand All @@ -27,15 +27,14 @@ import Data.Aeson (FromJSON, ToJSON, eitherDecodeStrict, encode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Base64.URL as Base64
import qualified Data.ByteString.Char8 as Char8
import qualified Data.ByteString.Lazy as BL
import Data.Either (partitionEithers)
import qualified Data.ByteString.Lazy.Char8 as BL
import Data.Generics.Product (HasType, getTyped)
import Data.Generics.Sum (AsType (..))
import Data.List (foldl')
import qualified Data.Map as Map
import qualified Data.Set as Set
import GHC.Generics (Generic)
import qualified Inferno.VersionControl.Client as VCClient
import Inferno.VersionControl.Log (VCCacheTrace (..))
import Inferno.VersionControl.Operations.Error (VCStoreError (..))
import Inferno.VersionControl.Server (VCServerError)
import Inferno.VersionControl.Types
Expand All @@ -44,44 +43,72 @@ import Inferno.VersionControl.Types
VCObjectHash (..),
vcObjectHashToByteString,
)
import Plow.Logging (IOTracer (..), traceWith)
import Servant.Client (ClientEnv, ClientError)
import Servant.Typed.Error (TypedClientM, runTypedClientM)
import System.AtomicWrite.Writer.LazyByteString (atomicWriteFile)
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.Directory (createDirectoryIfMissing)
import System.FilePath.Posix ((</>))
import System.IO.Error (isDoesNotExistError)
import System.Random.Shuffle (shuffleM)

data VCCacheEnv = VCCacheEnv
{ cachePath :: FilePath,
cacheInFlight :: TVar (Set.Set VCObjectHash)
cacheObjInFlight :: TVar (Set.Set VCObjectHash),
cacheDepInFlight :: TVar (Set.Set VCObjectHash),
tracer :: IOTracer VCCacheTrace
}
deriving (Generic)

-- | Makes sure only one thread at a time fetches the closure for certain
-- VCObjectHashes
withInFlight :: (MonadMask m, MonadIO m) => VCCacheEnv -> [VCObjectHash] -> m a -> m a
withInFlight VCCacheEnv {cacheInFlight} hashes = bracket_ acquire release
-- | Prevents thundering-herd problem by locking the key 'k' so only a single
-- 'fetchAndSave' runs concurrently
withSingleConcurrentFetch ::
(MonadMask m, MonadIO m, Ord k) =>
-- | A 'TVar' holding the set of keys which are currently being fetched
TVar (Set.Set k) ->
-- | A function that returns Just the value being cached if found locally or
-- Nothing if it hasn't being cached yet. No lock is taken when calling this
-- function
(k -> m (Maybe a)) ->
-- | A function that fetches the value and caches it. A lock is taken so only
-- a single call per key to this function runs concurrently
(k -> m a) ->
-- | The key associated to the value being cached
k ->
m a
withSingleConcurrentFetch keySetRef check fetchAndSave key =
check key >>= \case
Just x -> pure x
Nothing ->
bracket_ acquire release $
-- check again because another thread may have fetched while we were
-- blocked
check key >>= \case
Just x -> pure x
Nothing -> fetchAndSave key
where
acquire = liftIO $ atomically $ do
inFlight <- readTVar cacheInFlight
if any (`Set.member` inFlight) hashes
inFlight <- readTVar keySetRef
if key `Set.member` inFlight
then retry
else do
writeTVar cacheInFlight $! foldl' (flip Set.insert) inFlight hashes
writeTVar keySetRef $! key `Set.insert` inFlight
release = liftIO $ atomically $ do
inFlight <- readTVar cacheInFlight
writeTVar cacheInFlight $! foldl' (flip Set.delete) inFlight hashes
inFlight <- readTVar keySetRef
writeTVar keySetRef $! key `Set.delete` inFlight

data CachedVCClientError
= ClientVCStoreError VCServerError
| ClientServantError ClientError
| LocalVCStoreError VCStoreError
deriving (Show, Generic)

initVCCachedClient :: FilePath -> IO VCCacheEnv
initVCCachedClient cachePath = do
initVCCachedClient :: FilePath -> IOTracer VCCacheTrace -> IO VCCacheEnv
initVCCachedClient cachePath tracer = do
createDirectoryIfMissing True $ cachePath </> "deps"
cacheInFlight <- newTVarIO mempty
pure VCCacheEnv {cachePath, cacheInFlight}
cacheObjInFlight <- newTVarIO mempty
cacheDepInFlight <- newTVarIO mempty
pure VCCacheEnv {cachePath, cacheObjInFlight, cacheDepInFlight, tracer}

fetchVCObjectClosure ::
( MonadError err m,
Expand All @@ -103,82 +130,116 @@ fetchVCObjectClosure ::
VCObjectHash ->
m (Map.Map VCObjectHash (VCMeta a g VCObject))
fetchVCObjectClosure fetchVCObjects remoteFetchVCObjectClosureHashes objHash = do
env@VCCacheEnv {cachePath} <- asks getTyped
deps <-
withInFlight env [objHash] $
liftIO (doesFileExist $ cachePath </> "deps" </> show objHash) >>= \case
False -> do
deps <- liftServantClient $ remoteFetchVCObjectClosureHashes objHash
liftIO
$ atomicWriteFile
(cachePath </> "deps" </> show objHash)
$ BL.concat [BL.fromStrict (vcObjectHashToByteString h) <> "\n" | h <- deps]
pure deps
True -> fetchVCObjectClosureHashes objHash
withInFlight env deps $ do
(nonLocalHashes, localHashes) <-
partitionEithers
<$> forM
(objHash : deps)
( \depHash -> do
liftIO (doesFileExist $ cachePath </> show depHash) >>= \case
True -> pure $ Right depHash
False -> pure $ Left depHash
)
localObjs <-
Map.fromList
<$> forM
localHashes
( \h ->
(h,) <$> fetchVCObjectUnsafe h
)
VCCacheEnv {cacheObjInFlight, cacheDepInFlight} <- asks getTyped
deps <- withSingleConcurrentFetch cacheDepInFlight maybeReadCachedClosureHashes (fetchAndCacheClosureHashes remoteFetchVCObjectClosureHashes) objHash
-- shuffle scriptIds to improve concurrent performance when cache is cold
shuffledDeps <- liftIO $ shuffleM $ objHash : deps
mconcat
<$> mapM (withSingleConcurrentFetch cacheObjInFlight maybeReadCachedVCObject (fetchAndCacheVCObject fetchVCObjects)) shuffledDeps

nonLocalObjs <- liftServantClient $ fetchVCObjects nonLocalHashes
forM_ (Map.toList nonLocalObjs) $ \(h, o) ->
liftIO $ atomicWriteFile (cachePath </> show h) $ encode o
pure $ localObjs `Map.union` nonLocalObjs

fetchVCObjectClosureHashes ::
maybeReadCachedClosureHashes ::
( MonadError err m,
MonadIO m,
MonadReader env m,
HasType VCCacheEnv env,
AsType VCStoreError err,
HasType VCCacheEnv env
MonadReader env m,
MonadIO m,
MonadMask m
) =>
VCObjectHash ->
m [VCObjectHash]
fetchVCObjectClosureHashes h = do
VCCacheEnv {cachePath} <- asks getTyped
let fp = cachePath </> "deps" </> show h
readVCObjectHashTxt fp
m (Maybe [VCObjectHash])
maybeReadCachedClosureHashes objHash = do
VCCacheEnv {tracer} <- asks getTyped
tryJust (guard . isDoesNotExistError) readCachedClosureHashes >>= \case
Right deps ->
Just deps <$ traceWith tracer (VCCacheDepsHit objHash)
Left _ ->
Nothing <$ traceWith tracer (VCCacheDepsMiss objHash)
where
readCachedClosureHashes = do
path <- cachedDepsPath objHash
deps <- filter (not . B.null) . Char8.lines <$> liftIO (B.readFile path)
forM deps $ \dep -> do
decoded <-
either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $
Base64.decode dep
maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $
digestFromByteString decoded

readVCObjectHashTxt ::
fetchAndCacheClosureHashes ::
( MonadError err m,
AsType VCStoreError err,
HasType VCCacheEnv env,
HasType ClientEnv env,
AsType VCServerError err,
AsType ClientError err,
MonadReader env m,
MonadIO m
) =>
FilePath ->
(VCObjectHash -> VCClient.ClientMWithVCStoreError [VCObjectHash]) ->
VCObjectHash ->
m [VCObjectHash]
readVCObjectHashTxt fp = do
deps <- filter (not . B.null) . Char8.lines <$> liftIO (B.readFile fp)
forM deps $ \dep -> do
decoded <- either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $ Base64.decode dep
maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $ digestFromByteString decoded
fetchAndCacheClosureHashes remoteFetchVCObjectClosureHashes objHash = do
deps <- liftServantClient $ remoteFetchVCObjectClosureHashes objHash
path <- cachedDepsPath objHash
liftIO $
atomicWriteFile path $
BL.unlines $
map (BL.fromStrict . vcObjectHashToByteString) deps
pure deps

fetchVCObjectUnsafe ::
maybeReadCachedVCObject ::
( MonadReader r m,
HasType VCCacheEnv r,
MonadError e m,
AsType VCStoreError e,
MonadIO m,
MonadMask m,
FromJSON b
) =>
VCObjectHash ->
m b
fetchVCObjectUnsafe h = do
m (Maybe (Map.Map VCObjectHash b))
maybeReadCachedVCObject objHash = do
VCCacheEnv {tracer} <- asks getTyped
tryJust (guard . isDoesNotExistError) readCachedVCObject >>= \case
Left _ ->
Nothing <$ traceWith tracer (VCCacheMiss objHash)
Right obj ->
Just (Map.singleton objHash obj) <$ traceWith tracer (VCCacheHit objHash)
where
readCachedVCObject = do
path <- cachedObjPath objHash
either (throwing _Typed . CouldNotDecodeObject objHash) pure
=<< liftIO (eitherDecodeStrict <$> Char8.readFile path)

fetchAndCacheVCObject ::
( MonadError err m,
HasType VCCacheEnv env,
HasType ClientEnv env,
AsType VCServerError err,
AsType ClientError err,
MonadReader env m,
MonadIO m,
ToJSON a,
ToJSON g
) =>
([VCObjectHash] -> VCClient.ClientMWithVCStoreError (Map.Map VCObjectHash (VCMeta a g VCObject))) ->
VCObjectHash ->
m (Map.Map VCObjectHash (VCMeta a g VCObject))
fetchAndCacheVCObject fetchVCObjects objHash = do
objs <- liftServantClient $ fetchVCObjects [objHash]
forM_ (Map.toList objs) $ \(h, o) -> do
path <- cachedObjPath h
liftIO $ atomicWriteFile path $ encode o
pure objs

cachedDepsPath :: (MonadReader r m, HasType VCCacheEnv r) => VCObjectHash -> m FilePath
cachedDepsPath objHash = do
VCCacheEnv {cachePath} <- asks getTyped
pure $ cachePath </> "deps" </> show objHash

cachedObjPath :: (MonadReader r m, HasType VCCacheEnv r) => VCObjectHash -> m FilePath
cachedObjPath objHash = do
VCCacheEnv {cachePath} <- asks getTyped
let fp = cachePath </> show h
either (throwing _Typed . CouldNotDecodeObject h) pure =<< liftIO (eitherDecodeStrict <$> Char8.readFile fp)
pure $ cachePath </> show objHash

liftServantClient ::
( MonadError e m,
Expand Down
28 changes: 26 additions & 2 deletions inferno-vc/src/Inferno/VersionControl/Log.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
{-# LANGUAGE OverloadedStrings #-}

module Inferno.VersionControl.Log where
module Inferno.VersionControl.Log
( VCServerTrace (..),
VCCacheTrace (..),
vcServerTraceToText,
vcCacheTraceToText,
)
where

import Data.Text (Text, pack)
import Data.Text (Text, intercalate, pack)
import Inferno.VersionControl.Operations.Error (VCStoreError, vcStoreErrorToString)
import Inferno.VersionControl.Types (VCObjectHash)

data VCServerTrace
= ThrownVCStoreError VCStoreError
Expand All @@ -14,6 +21,8 @@ data VCServerTrace
| ReadJSON FilePath
| ReadTxt FilePath
| DeleteFile FilePath
| VCFetchObjects [VCObjectHash]
| VCFetchObjectClosureHashes VCObjectHash

vcServerTraceToText :: VCServerTrace -> Text
vcServerTraceToText = \case
Expand All @@ -25,3 +34,18 @@ vcServerTraceToText = \case
ThrownVCStoreError e -> pack (vcStoreErrorToString e)
ThrownVCOtherError e -> "Other server error: " <> e
DeleteFile fp -> "Deleting file: " <> pack fp
VCFetchObjects objs -> "FetchObjects " <> intercalate ", " (map (pack . show) objs)
VCFetchObjectClosureHashes obj -> "FetchObjectClosureHashes " <> pack (show obj)

data VCCacheTrace
= VCCacheHit VCObjectHash
| VCCacheMiss VCObjectHash
| VCCacheDepsHit VCObjectHash
| VCCacheDepsMiss VCObjectHash

vcCacheTraceToText :: VCCacheTrace -> Text
vcCacheTraceToText = \case
VCCacheHit h -> "VC Cache hit " <> pack (show h)
VCCacheMiss h -> "VC Cache miss " <> pack (show h)
VCCacheDepsHit h -> "VC Cache deps hit " <> pack (show h)
VCCacheDepsMiss h -> "VC Cache deps miss " <> pack (show h)
Loading
Loading