Skip to content

Commit

Permalink
[inferno-vc] Resurrect #139 and more granular locks in cached client (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
albertov authored Oct 14, 2024
1 parent fcb00f4 commit 0530836
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 59 deletions.
9 changes: 9 additions & 0 deletions inferno-vc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Revision History for inferno-vc
*Note*: we use https://pvp.haskell.org/ (MAJOR.MAJOR.MINOR.PATCH)

## 0.3.8.0 -- 2024-10-14
* Made fetchObjectClosureHashes return the scriptId used to call it since it
also belongs in the closure.
* Added logging to cached client to see hits and misses
* Added logging to server to see what scriptIds are being used to request
fetchObjects and fetchObjectClosureHashes
* Made locks on cache more granular and only fetch a single upstream object per
request

## 0.3.7.1 -- 2024-09-23
* Fix overflowing threadDelay on armv7l

Expand Down
3 changes: 2 additions & 1 deletion inferno-vc/inferno-vc.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: >=1.10
name: inferno-vc
version: 0.3.7.1
version: 0.3.8.0
synopsis: Version control server for Inferno
description: A version control server for Inferno scripts
category: DSL,Scripting
Expand Down Expand Up @@ -69,6 +69,7 @@ library
, QuickCheck
, stm
, unbounded-delays
, random-shuffle

default-language: Haskell2010
default-extensions:
Expand Down
101 changes: 53 additions & 48 deletions inferno-vc/src/Inferno/VersionControl/Client/Cached.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import Data.Aeson (FromJSON, ToJSON, eitherDecodeStrict, encode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Base64.URL as Base64
import qualified Data.ByteString.Char8 as Char8
import qualified Data.ByteString.Lazy as BL
import Data.Either (partitionEithers)
import qualified Data.ByteString.Lazy.Char8 as BL
import Data.Generics.Product (HasType, getTyped)
import Data.Generics.Sum (AsType (..))
import Data.List (foldl')
import qualified Data.Map as Map
import qualified Data.Set as Set
import GHC.Generics (Generic)
import qualified Inferno.VersionControl.Client as VCClient
import Inferno.VersionControl.Log (VCCacheTrace (..))
import Inferno.VersionControl.Operations.Error (VCStoreError (..))
import Inferno.VersionControl.Server (VCServerError)
import Inferno.VersionControl.Types
Expand All @@ -44,44 +44,49 @@ import Inferno.VersionControl.Types
VCObjectHash (..),
vcObjectHashToByteString,
)
import Plow.Logging (IOTracer (..), traceWith)
import Servant.Client (ClientEnv, ClientError)
import Servant.Typed.Error (TypedClientM, runTypedClientM)
import System.AtomicWrite.Writer.LazyByteString (atomicWriteFile)
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath.Posix ((</>))
import System.Random.Shuffle (shuffleM)

data VCCacheEnv = VCCacheEnv
{ cachePath :: FilePath,
cacheInFlight :: TVar (Set.Set VCObjectHash)
cacheObjInFlight :: TVar (Set.Set VCObjectHash),
cacheDepInFlight :: TVar (Set.Set VCObjectHash),
tracer :: IOTracer VCCacheTrace
}
deriving (Generic)

-- | Makes sure only one thread at a time fetches the closure for certain
-- VCObjectHashes
withInFlight :: (MonadMask m, MonadIO m) => VCCacheEnv -> [VCObjectHash] -> m a -> m a
withInFlight VCCacheEnv {cacheInFlight} hashes = bracket_ acquire release
withInFlight :: (MonadMask m, MonadIO m) => TVar (Set.Set VCObjectHash) -> [VCObjectHash] -> m a -> m a
withInFlight hashSetRef hashes = bracket_ acquire release
where
acquire = liftIO $ atomically $ do
inFlight <- readTVar cacheInFlight
inFlight <- readTVar hashSetRef
if any (`Set.member` inFlight) hashes
then retry
else do
writeTVar cacheInFlight $! foldl' (flip Set.insert) inFlight hashes
writeTVar hashSetRef $! foldl' (flip Set.insert) inFlight hashes
release = liftIO $ atomically $ do
inFlight <- readTVar cacheInFlight
writeTVar cacheInFlight $! foldl' (flip Set.delete) inFlight hashes
inFlight <- readTVar hashSetRef
writeTVar hashSetRef $! foldl' (flip Set.delete) inFlight hashes

data CachedVCClientError
= ClientVCStoreError VCServerError
| ClientServantError ClientError
| LocalVCStoreError VCStoreError
deriving (Show, Generic)

initVCCachedClient :: FilePath -> IO VCCacheEnv
initVCCachedClient cachePath = do
createDirectoryIfMissing True $ cachePath </> "deps"
cacheInFlight <- newTVarIO mempty
pure VCCacheEnv {cachePath, cacheInFlight}
initVCCachedClient :: FilePath -> IOTracer VCCacheTrace -> IO VCCacheEnv
initVCCachedClient cachePath tracer = do
createDirectoryIfMissing True $ cachePath </> "deps-v2"
cacheObjInFlight <- newTVarIO mempty
cacheDepInFlight <- newTVarIO mempty
pure VCCacheEnv {cachePath, cacheObjInFlight, cacheDepInFlight, tracer}

fetchVCObjectClosure ::
( MonadError err m,
Expand All @@ -103,40 +108,36 @@ fetchVCObjectClosure ::
VCObjectHash ->
m (Map.Map VCObjectHash (VCMeta a g VCObject))
fetchVCObjectClosure fetchVCObjects remoteFetchVCObjectClosureHashes objHash = do
env@VCCacheEnv {cachePath} <- asks getTyped
VCCacheEnv {cachePath, cacheObjInFlight, cacheDepInFlight, tracer} <- asks getTyped
deps <-
withInFlight env [objHash] $
liftIO (doesFileExist $ cachePath </> "deps" </> show objHash) >>= \case
withInFlight cacheDepInFlight [objHash] $
liftIO (doesFileExist $ cachePath </> "deps-v2" </> show objHash) >>= \case
False -> do
traceWith tracer $ VCCacheDepsMiss objHash
deps <- liftServantClient $ remoteFetchVCObjectClosureHashes objHash
liftIO
$ atomicWriteFile
(cachePath </> "deps" </> show objHash)
$ BL.concat [BL.fromStrict (vcObjectHashToByteString h) <> "\n" | h <- deps]
liftIO $
atomicWriteFile (cachePath </> "deps-v2" </> show objHash) $
BL.unlines $
map (BL.fromStrict . vcObjectHashToByteString) deps
pure deps
True -> fetchVCObjectClosureHashes objHash
withInFlight env deps $ do
(nonLocalHashes, localHashes) <-
partitionEithers
<$> forM
(objHash : deps)
( \depHash -> do
liftIO (doesFileExist $ cachePath </> show depHash) >>= \case
True -> pure $ Right depHash
False -> pure $ Left depHash
)
localObjs <-
Map.fromList
<$> forM
localHashes
( \h ->
(h,) <$> fetchVCObjectUnsafe h
)

nonLocalObjs <- liftServantClient $ fetchVCObjects nonLocalHashes
forM_ (Map.toList nonLocalObjs) $ \(h, o) ->
liftIO $ atomicWriteFile (cachePath </> show h) $ encode o
pure $ localObjs `Map.union` nonLocalObjs
True -> do
traceWith tracer $ VCCacheDepsHit objHash
fetchVCObjectClosureHashes objHash
-- shuffle deps to improve concurrent performance
shuffledDeps <- liftIO $ shuffleM deps
fmap mconcat $
forM shuffledDeps $ \depHash ->
withInFlight cacheObjInFlight [depHash] $
liftIO (doesFileExist $ cachePath </> show depHash) >>= \case
True -> do
traceWith tracer (VCCacheHit depHash)
Map.singleton depHash <$> fetchVCObjectUnsafe depHash
False -> do
traceWith tracer (VCCacheMiss depHash)
objs <- liftServantClient $ fetchVCObjects [depHash]
forM_ (Map.toList objs) $ \(h, o) ->
liftIO $ atomicWriteFile (cachePath </> show h) $ encode o
pure objs

fetchVCObjectClosureHashes ::
( MonadError err m,
Expand All @@ -149,7 +150,7 @@ fetchVCObjectClosureHashes ::
m [VCObjectHash]
fetchVCObjectClosureHashes h = do
VCCacheEnv {cachePath} <- asks getTyped
let fp = cachePath </> "deps" </> show h
let fp = cachePath </> "deps-v2" </> show h
readVCObjectHashTxt fp

readVCObjectHashTxt ::
Expand All @@ -162,8 +163,11 @@ readVCObjectHashTxt ::
readVCObjectHashTxt fp = do
deps <- filter (not . B.null) . Char8.lines <$> liftIO (B.readFile fp)
forM deps $ \dep -> do
decoded <- either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $ Base64.decode dep
maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $ digestFromByteString decoded
decoded <-
either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $
Base64.decode dep
maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $
digestFromByteString decoded

fetchVCObjectUnsafe ::
( MonadReader r m,
Expand All @@ -178,7 +182,8 @@ fetchVCObjectUnsafe ::
fetchVCObjectUnsafe h = do
VCCacheEnv {cachePath} <- asks getTyped
let fp = cachePath </> show h
either (throwing _Typed . CouldNotDecodeObject h) pure =<< liftIO (eitherDecodeStrict <$> Char8.readFile fp)
either (throwing _Typed . CouldNotDecodeObject h) pure
=<< liftIO (eitherDecodeStrict <$> Char8.readFile fp)

liftServantClient ::
( MonadError e m,
Expand Down
28 changes: 26 additions & 2 deletions inferno-vc/src/Inferno/VersionControl/Log.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
{-# LANGUAGE OverloadedStrings #-}

module Inferno.VersionControl.Log where
module Inferno.VersionControl.Log
( VCServerTrace (..),
VCCacheTrace (..),
vcServerTraceToText,
vcCacheTraceToText,
)
where

import Data.Text (Text, pack)
import Data.Text (Text, intercalate, pack)
import Inferno.VersionControl.Operations.Error (VCStoreError, vcStoreErrorToString)
import Inferno.VersionControl.Types (VCObjectHash)

data VCServerTrace
= ThrownVCStoreError VCStoreError
Expand All @@ -14,6 +21,8 @@ data VCServerTrace
| ReadJSON FilePath
| ReadTxt FilePath
| DeleteFile FilePath
| VCFetchObjects [VCObjectHash]
| VCFetchObjectClosureHashes VCObjectHash

vcServerTraceToText :: VCServerTrace -> Text
vcServerTraceToText = \case
Expand All @@ -25,3 +34,18 @@ vcServerTraceToText = \case
ThrownVCStoreError e -> pack (vcStoreErrorToString e)
ThrownVCOtherError e -> "Other server error: " <> e
DeleteFile fp -> "Deleting file: " <> pack fp
VCFetchObjects objs -> "FetchObjects " <> intercalate ", " (map (pack . show) objs)
VCFetchObjectClosureHashes obj -> "FetchObjectClosureHashes " <> pack (show obj)

data VCCacheTrace
= VCCacheHit VCObjectHash
| VCCacheMiss VCObjectHash
| VCCacheDepsHit VCObjectHash
| VCCacheDepsMiss VCObjectHash

vcCacheTraceToText :: VCCacheTrace -> Text
vcCacheTraceToText = \case
VCCacheHit h -> "VC Cache hit " <> pack (show h)
VCCacheMiss h -> "VC Cache miss " <> pack (show h)
VCCacheDepsHit h -> "VC Cache deps hit " <> pack (show h)
VCCacheDepsMiss h -> "VC Cache deps miss " <> pack (show h)
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ instance
fetchVCObjectClosureHashes h = do
VCStorePath storePath <- asks getTyped
let fp = storePath </> "deps" </> show h
readVCObjectHashTxt fp
(h :) <$> readVCObjectHashTxt fp

deleteAutosavedVCObjectsOlderThan t = do
-- We know that all autosaves must be heads:
Expand Down
17 changes: 12 additions & 5 deletions inferno-vc/src/Inferno/VersionControl/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import Data.Time.Clock.POSIX (getPOSIXTime)
import GHC.Generics (Generic)
import Inferno.Types.Syntax (Expr)
import Inferno.Types.Type (TCScheme)
import Inferno.VersionControl.Log (VCServerTrace (ThrownVCOtherError, ThrownVCStoreError), vcServerTraceToText)
import Inferno.VersionControl.Log (VCServerTrace (..), vcServerTraceToText)
import qualified Inferno.VersionControl.Operations as Ops
import qualified Inferno.VersionControl.Operations.Error as Ops
import Inferno.VersionControl.Server.Types (readServerConfig)
Expand Down Expand Up @@ -100,14 +100,21 @@ vcServer ::
Ord (Ops.Group m)
) =>
(forall x. m x -> Handler (Union (WithError VCServerError x))) ->
IOTracer VCServerTrace ->
Server (VersionControlAPI (Ops.Author m) (Ops.Group m))
vcServer toHandler =
vcServer toHandler tracer =
toHandler . fetchFunctionH
:<|> toHandler . Ops.fetchFunctionsForGroups
:<|> toHandler . Ops.fetchVCObject
:<|> toHandler . Ops.fetchVCObjectHistory
:<|> toHandler . fetchVCObjects
:<|> toHandler . Ops.fetchVCObjectClosureHashes
:<|> ( \objs ->
traceWith tracer (VCFetchObjects objs)
>> toHandler (fetchVCObjects objs)
)
:<|> ( \obj ->
traceWith tracer (VCFetchObjectClosureHashes obj)
>> toHandler (Ops.fetchVCObjectClosureHashes obj)
)
:<|> toHandler . pushFunctionH
:<|> toHandler . Ops.deleteAutosavedVCObject
:<|> toHandler . Ops.deleteVCObjects
Expand Down Expand Up @@ -187,7 +194,7 @@ runServerConfig middleware withEnv runOp serverConfig = do
gzip def $
middleware env $
serve (Proxy :: Proxy (VersionControlAPI a g)) $
vcServer (liftIO . liftTypedError . flip runOp env)
vcServer (liftIO . liftTypedError . flip runOp env) serverTracer

withLinkedAsync_ :: IO a -> IO b -> IO b
withLinkedAsync_ f g = withAsync f $ \h -> link h >> g
4 changes: 2 additions & 2 deletions inferno-vc/src/Inferno/VersionControl/Testing.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ vcServerSpec url = do
metas <- runOperation vcClientEnv (fetchFunctionsForGroups (Set.singleton g))
map obj metas `shouldBe` [h4]

-- The closure of h4 should be empty as it has no dependencies:
-- The closure of h4 should only contain h4 as it has no dependencies:
metas' <- runOperation vcClientEnv (fetchVCObjectClosureHashes h4)
metas' `shouldBe` []
metas' `shouldBe` [h4]

-- After cloning h4 to h5, fetchFunctionsForGroups should return h4 and h5:
o5 <- createObjForGroup g VCObjectPublic $ CloneOf h4
Expand Down

0 comments on commit 0530836

Please sign in to comment.