Skip to content

Commit

Permalink
api: introduce ElasticError
Browse files Browse the repository at this point in the history
This change catches error produced by the elastic client so that they can
be cleanly handled by the API handlers. Though this change keeps the "call
error on failure" behavior using the `abortOnEsError` helper.
  • Loading branch information
TristanCacqueray committed Jan 5, 2024
1 parent 72bf345 commit ff71f31
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ usageJanitor =
(eitherReader $ (first T.unpack . Config.mkIndexName) . T.pack)
(long "workspace" <> O.help "Workspace name" <> metavar "WORKSPACE")
crawlerNameOption = strOption (long "crawler-name" <> O.help "The crawler name" <> metavar "CRAWLER_NAME")
runOnWorkspace env action' workspace = runEff $ runLoggerEffect $ runElasticEffect env $ runEmptyQueryM workspace action'
runOnWorkspace env action' workspace = runEff $ runLoggerEffect $ runElasticEffect env $ runEmptyQueryM workspace $ abortOnEsError action'
noWorkspace workspaceName = "Unable to find the workspace " <> Config.getIndexName workspaceName <> " in the Monocle config"
janitorUpdateIdent = io <$> parser
where
Expand Down
15 changes: 6 additions & 9 deletions src/Database/Bloodhound/Raw.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module Database.Bloodhound.Raw (
mkTermsCompositeAgg,
) where

import Control.Monad.Catch (MonadThrow)
import Control.Monad.Catch (MonadThrow, throwM)
import Data.Aeson
import Data.Aeson qualified as Aeson
import Data.Aeson.Casing.Internal qualified as AesonCasing
Expand Down Expand Up @@ -62,11 +62,11 @@ advance :: (MonadBH m, MonadThrow m, FromJSON resp) => BH.ScrollId -> m (BH.Sear
advance scroll = do
resp <- BH.advanceScroll scroll 60
case resp of
Left e -> handleError e
Left err -> throwEsError mempty err
Right x -> pure x
where
handleError resp = do
error $ "Elastic scroll response failed" <> show resp

throwEsError :: MonadThrow m => LByteString -> BH.EsError -> m a
throwEsError resp err = throwM $ BH.EsProtocolException err.errorMessage resp

settings :: (MonadBH m, ToJSON body) => BH.IndexName -> body -> m ()
settings (BH.IndexName index) body = do
Expand Down Expand Up @@ -104,7 +104,7 @@ search index body scrollRequest = do
rawResp <- search' index newBody qs
resp <- BH.parseEsResponse rawResp
case resp of
Left e -> handleError e rawResp
Left err -> throwEsError mempty err
Right x -> pure x
where
newBody = case (fields, toJSON body) of
Expand All @@ -125,9 +125,6 @@ search index body scrollRequest = do
qs = case scrollRequest of
NoScroll -> []
GetScroll x -> [("scroll", Just x)]
handleError _resp rawResp = do
-- logWarn "Elastic response failed" ["status" .= BH.errorStatus resp, "message" .= BH.errorMessage resp]
error $ "Elastic response failed: " <> show rawResp

-- | A special purpose search implementation that uses the faster json-syntax
searchHit ::
Expand Down
4 changes: 2 additions & 2 deletions src/Macroscope/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ runLentilleM client action = do
testCrawlingPoint :: Assertion
testCrawlingPoint = do
appEnv <- mkAppEnv fakeConfig
runAppEnv appEnv $ runEmptyQueryM fakeConfig do
runAppEnv appEnv $ runEmptyQueryM fakeConfig $ abortOnEsError do
I.ensureIndexSetup
let fakeChange1 =
BT.fakeChange
Expand Down Expand Up @@ -124,7 +124,7 @@ testTaskDataMacroscope = withTestApi appEnv $ \client -> testAction client
| otherwise = error $ "Unexpected product entity: " <> show project
void $ runLentilleM client $ Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.TaskDatas stream)
-- Check task data got indexed
withTenantConfig fakeConfig do
withTenantConfig fakeConfig $ abortOnEsError do
count <- withQuery taskDataQuery $ Streaming.length_ Q.scanSearchId
liftIO (assertEqual "Task data got indexed by macroscope" count 1)

Expand Down
25 changes: 13 additions & 12 deletions src/Monocle/Api/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ updateIndex index wsRef = E.modifyMVar_ wsRef doUpdateIfNeeded
Nothing -> error $ "Unknown workspace: " <> show (Config.getWorkspaceName index)

refreshIndex :: Eff es ()
refreshIndex = do
refreshIndex = abortOnEsError do
logInfo "RefreshIndex" ["index" .= Config.getWorkspaceName index]
runRetry I.ensureIndexSetup
traverse_ I.initCrawlerMetadata index.crawlers
Expand Down Expand Up @@ -171,7 +171,7 @@ loginLoginValidation _auth request = do
validateOnIndex :: Text -> Config.Index -> MaybeT (Eff es) ()
validateOnIndex username index = do
let userQuery = Q.toUserTerm username
count <- lift $ runEmptyQueryM index $ withFilter [userQuery] Q.countDocs
count <- lift $ abortOnEsError $ runEmptyQueryM index $ withFilter [userQuery] Q.countDocs
when (count > 0) mzero

-- | /api/2/about endpoint
Expand Down Expand Up @@ -314,7 +314,7 @@ crawlerAddDoc _auth request = do
pure (index, crawler)

case requestE of
Right (index, crawler) -> runEmptyQueryM index do
Right (index, crawler) -> runEmptyQueryM index $ abortOnEsError do
unless (V.null errors) do
addErrors crawlerName (toEntity entity) errors
case toEntity entity of
Expand Down Expand Up @@ -405,7 +405,7 @@ crawlerCommit _auth request = do
pure (index, ts, toEntity entityPB)

case requestE of
Right (index, ts, entity) -> runEmptyQueryM index $ do
Right (index, ts, entity) -> runEmptyQueryM index $ abortOnEsError $ do
let date = Timestamp.toUTCTime ts
logInfo "UpdatingEntity" ["crawler" .= crawlerName, "entity" .= entity, "date" .= date]
-- TODO: check for CommitDateInferiorThanPrevious
Expand Down Expand Up @@ -445,7 +445,7 @@ crawlerCommitInfo _auth request = do

case requestE of
Right (index, worker, entityType) -> do
runEmptyQueryM index $ do
runEmptyQueryM index $ abortOnEsError do
updateIndex index wsStatus
toUpdateEntityM <- I.getLastUpdated worker (fromPBEnum entityType) offset
case toUpdateEntityM of
Expand Down Expand Up @@ -487,7 +487,7 @@ searchSuggestions auth request = checkAuth auth . const $ do
case tenantM of
Just tenant -> do
now <- getCurrentTime
runQueryM tenant (emptyQ now) $ Q.getSuggestions tenant
runQueryM tenant (emptyQ now) $ abortOnEsError $ Q.getSuggestions tenant
Nothing ->
-- Simply return empty suggestions in case of unknown tenant
pure
Expand Down Expand Up @@ -531,7 +531,7 @@ searchAuthor auth request = checkAuth auth . const $ do
authorAliases = V.fromList $ from <$> aliases
authorGroups = V.fromList $ from <$> fromMaybe mempty groups
in SearchPB.Author {..}
found <- runEmptyQueryM index $ I.searchAuthorCache . from $ authorRequestQuery
found <- runEmptyQueryM index $ abortOnEsError $ I.searchAuthorCache . from $ authorRequestQuery
pure $ toSearchAuthor <$> found
Nothing -> pure []

Expand Down Expand Up @@ -570,7 +570,7 @@ crawlerErrors auth request = checkAuth auth response
requestE <- validateSearchRequest request.errorsRequestIndex request.errorsRequestQuery "nobody"

case requestE of
Right (tenant, query) -> runQueryM tenant (Q.ensureMinBound query) $ do
Right (tenant, query) -> runQueryM tenant (Q.ensureMinBound query) $ abortOnEsError do
logInfo "ListingErrors" ["index" .= request.errorsRequestIndex]
errors <- toErrorsList <$> Q.crawlerErrors
pure $ CrawlerPB.ErrorsResponse $ Just $ CrawlerPB.ErrorsResponseResultSuccess $ CrawlerPB.ErrorsList $ fromList errors
Expand Down Expand Up @@ -611,7 +611,7 @@ searchQuery auth request = checkAuth auth response
requestE <- validateSearchRequest queryRequestIndex queryRequestQuery username

case requestE of
Right (tenant, query) -> runQueryM tenant (Q.ensureMinBound query) $ do
Right (tenant, query) -> runQueryM tenant (Q.ensureMinBound query) $ abortOnEsError do
let queryType = fromPBEnum queryRequestQueryType
logInfo
"Searching"
Expand Down Expand Up @@ -946,9 +946,10 @@ metricGet auth request = checkAuth auth response
-- Unknown query
_ -> handleError $ "Unknown metric: " <> from getRequestMetric
where
runM :: Eff (MonoQuery : es) a -> Eff es a
runM = runQueryM tenant (Q.ensureMinBound query)
runMetric :: (TrendPB a, TopPB a, NumPB a) => Q.Metric (MonoQuery : es) a -> Eff es MetricPB.GetResponse
runM :: Eff (MonoQuery : Error ElasticError : es) a -> Eff es a
runM = abortOnEsError . runQueryM tenant (Q.ensureMinBound query)

runMetric :: (TrendPB a, TopPB a, NumPB a) => Q.Metric (MonoQuery : Error ElasticError : es) a -> Eff es MetricPB.GetResponse
runMetric m = case getRequestOptions of
Just (MetricPB.GetRequestOptionsTrend (MetricPB.Trend interval)) ->
toTrendResult <$> runM (Q.runMetricTrend m $ fromPBTrendInterval $ from interval)
Expand Down
4 changes: 2 additions & 2 deletions src/Monocle/Api/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import Servant.Auth.Server (
generateKey,
)

import Database.Bloodhound qualified as BH
import Database.Bloodhound qualified as BH (BHEnv)
import Effectful.Error.Static qualified as E
import Effectful.Fail qualified as E
import Effectful.Reader.Static qualified as E
Expand Down Expand Up @@ -69,7 +69,7 @@ withTestApi appEnv' testCb = bracket appEnv' cleanIndex runTest
jwtCfg = appEnv.aOIDC.localJWTSettings
cfg = jwtCfg :. cookieCfg :. EmptyContext
traverse_
(\index -> runEmptyQueryM index I.ensureIndex)
(\index -> abortOnEsError $ runEmptyQueryM index I.ensureIndex)
indexes
unsafeEff $ \es ->
let app = Effectful.Servant.hoistEff @RootAPI es cfg (rootServer cookieCfg)
Expand Down
6 changes: 3 additions & 3 deletions src/Monocle/Backend/Index.hs
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,14 @@ ensureConfigIndex = do
traverseWorkspace action conf = do
traverse_ (\ws -> localQueryTarget (QueryWorkspace ws) action) (Config.getWorkspaces conf)

ensureIndexSetup :: (MonoQuery :> es, LoggerEffect :> es, ElasticEffect :> es, Retry :> es) => Eff es ()
ensureIndexSetup :: (MonoQuery :> es, LoggerEffect :> es, Error ElasticError :> es, ElasticEffect :> es, Retry :> es) => Eff es ()
ensureIndexSetup = do
indexName <- getIndexName
logInfo "Ensure workspace " ["index" .= indexName]
createIndex indexName ChangesIndexMapping
esSettings indexName (object ["index" .= object ["max_regex_length" .= (50_000 :: Int)]])

ensureIndexCrawlerMetadata :: (E.Fail :> es, LoggerEffect :> es, ElasticEffect :> es, MonoQuery :> es) => Eff es ()
ensureIndexCrawlerMetadata :: (E.Fail :> es, LoggerEffect :> es, Error ElasticError :> es, ElasticEffect :> es, MonoQuery :> es) => Eff es ()
ensureIndexCrawlerMetadata = do
QueryWorkspace config <- getQueryTarget
traverse_ initCrawlerMetadata config.crawlers
Expand All @@ -485,7 +485,7 @@ withRefresh action = do
refreshResp <- esRefreshIndex index
unless (BH.isSuccess refreshResp) (error $ "Unable to refresh index: " <> show resp)

ensureIndex :: (E.Fail :> es, LoggerEffect :> es, MonoQuery :> es, ElasticEffect :> es, Retry :> es) => Eff es ()
ensureIndex :: (E.Fail :> es, LoggerEffect :> es, MonoQuery :> es, Error ElasticError :> es, ElasticEffect :> es, Retry :> es) => Eff es ()
ensureIndex = do
ensureIndexSetup
ensureIndexCrawlerMetadata
Expand Down
4 changes: 2 additions & 2 deletions src/Monocle/Backend/Provisioner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import Google.Protobuf.Timestamp qualified (fromUTCTime)
import Monocle.Backend.Documents
import Monocle.Backend.Test qualified as T
import Monocle.Config (csConfig, getWorkspaces, lookupTenant, mkIndexName)
import Monocle.Effects (getReloadConfig, runElasticEffect, runEmptyQueryM, runMonoConfig)
import Monocle.Effects (getReloadConfig, runElasticEffect, runEmptyQueryM, runMonoConfig, abortOnEsError)
import Monocle.Env (mkEnv)
import Monocle.Prelude
import Monocle.Protob.Search (TaskData (..))
Expand All @@ -50,7 +50,7 @@ runProvisioner configPath elasticUrl tenantName docCount = do
case tenantM of
Just tenant -> do
bhEnv <- mkEnv elasticUrl
r <- runRetry $ runFail $ runElasticEffect bhEnv $ do
r <- runRetry $ runFail $ runElasticEffect bhEnv $ abortOnEsError do
events <- liftIO $ createFakeEvents docCount
runEmptyQueryM tenant $ T.indexScenario events
logInfo "Provisionned" ["index" .= indexName, "doc count" .= length events]
Expand Down
2 changes: 1 addition & 1 deletion src/Monocle/Backend/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ allMetrics :: [MetricInfo]
allMetrics =
map
metricInfo
[ toJSON <$> metricChangesCreated @[ElasticEffect, LoggerEffect, MonoQuery]
[ toJSON <$> metricChangesCreated @[ElasticEffect, Error ElasticError, LoggerEffect, MonoQuery]
, toJSON <$> metricChangesMerged
, toJSON <$> metricChangesAbandoned
, toJSON <$> metricChangesSelfMerged
Expand Down
Loading

0 comments on commit ff71f31

Please sign in to comment.