Skip to content

Commit

Permalink
refactor(runtime): simplify async io slightly
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed May 25, 2021
1 parent a9ba383 commit 7beb822
Showing 1 changed file with 25 additions and 27 deletions.
52 changes: 25 additions & 27 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ spawn ls a s = do
------------------------------------------------------------------------

data AsyncState = AsyncState
{ asyncStateAsyncIO :: Map (Async IOResult) Promise
{ asyncStateAsyncIO :: Set (Async (Promise, IOResult))
, asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (),
LocalRef)
, asyncStateAdminSend :: Map Promise (TMVar Message)
}

emptyAsyncState :: AsyncState
emptyAsyncState = AsyncState Map.empty Map.empty Map.empty
emptyAsyncState = AsyncState Set.empty Map.empty Map.empty

madePromises :: [Action] -> Set Int
madePromises = foldMap go
Expand All @@ -174,8 +174,8 @@ madePromises = foldMap go
go (AsyncIOAction _io (Promise i)) = Set.singleton i
go OnAction {} = Set.empty

act :: EventLoopName -> [Action] -> AsyncState -> Transport IO -> IO AsyncState
act name as s0 t = foldM go s0 as
act :: EventLoopName -> [Action] -> Transport IO -> AsyncState -> IO AsyncState
act name as t s0 = foldM go s0 as
where
is :: Set Int
is = madePromises as
Expand All @@ -184,20 +184,22 @@ act name as s0 t = foldM go s0 as
go s (SendAction from msg to (Promise i)) = do
transportSend t
(Envelope RequestKind (localToRemoteRef name from) msg to (CorrelationId i))
return s -- XXX: make a note of when we sent so we can timeout.
-- XXX: make a note of when we sent so we can timeout.
return s
go s (AsyncIOAction io p) = do
a <- async io -- XXX: Use `asyncOn` a different capability than main loop.
return (s { asyncStateAsyncIO = Map.insert a p (asyncStateAsyncIO s) })
-- XXX: Use `asyncOn` a different capability than main loop.
a <- fmap (fmap (\x -> (p, x))) (async io)
return (s { asyncStateAsyncIO = Set.insert a (asyncStateAsyncIO s) })
go s (OnAction p@(Promise i) k lref)
| i `Set.member` is = do
| i `Set.member` is =
return (s { asyncStateContinuations =
Map.insert p (k, lref) (asyncStateContinuations s) })
| otherwise =
error "act: impossible, `On` must be supplied with a promise that was just made."

data Reaction
= Response Promise Message
| AsyncIOFinished (Async IOResult) IOResult
| AsyncIOFinished Promise IOResult

data ReactTask
= NothingToDo
Expand All @@ -220,18 +222,14 @@ react (Response p msg) s =
s { asyncStateAdminSend =
Map.delete p (asyncStateAdminSend s) })

react (AsyncIOFinished a result) s =
case Map.lookup a (asyncStateAsyncIO s) of
Nothing -> error "react: impossible, unknown async finished."
Just p -> case Map.lookup p (asyncStateContinuations s) of
Nothing ->
-- No continuation was registered for this async.
-- XXX: the async handler should take care for this map deletion...
(NothingToDo, s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) })
Just (k, lref) -> (ResumeContinuation (k (Left result)) lref,
s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s)
, asyncStateContinuations = Map.delete p (asyncStateContinuations s)
})
react (AsyncIOFinished p result) s =
case Map.lookup p (asyncStateContinuations s) of
Nothing ->
-- No continuation was registered for this async.
(NothingToDo, s)
Just (k, lref) -> (ResumeContinuation (k (Left result)) lref,
s { asyncStateContinuations =
Map.delete p (asyncStateContinuations s) })

reactIO :: Reaction -> TVar AsyncState -> IO ReactTask
reactIO r v = atomically (stateTVar v (react r))
Expand Down Expand Up @@ -296,11 +294,11 @@ handleAsyncIO ls = forever go
go = atomically $ do
-- XXX: Use waitAnyCatchSTM and handle exceptions appropriately here, e.g.
-- by extending `AsyncIOFinished` with `Fail` and `Info`.
as <- readTVar (lsAsyncState ls)
(a, ioResult) <- waitAnySTM (Map.keys (asyncStateAsyncIO as))
writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished a ioResult))
s <- readTVar (lsAsyncState ls)
(a, (p, ioResult)) <- waitAnySTM (Set.toList (asyncStateAsyncIO s))
writeTBQueue (lsQueue ls) (Reaction (AsyncIOFinished p ioResult))
writeTVar (lsAsyncState ls)
(as { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO as) })
(s { asyncStateAsyncIO = Set.delete a (asyncStateAsyncIO s) })

handleEvents :: EventLoop -> IO ()
handleEvents ls = forever go
Expand All @@ -317,7 +315,7 @@ handleEvent (Action a) ls = do
-- XXX: Non-atomic update of `lsAsyncState`, should be fixed...
-- XXX
s <- readTVarIO (lsAsyncState ls)
s' <- act (lsName ls) [a] s (lsTransport ls)
s' <- act (lsName ls) [a] (lsTransport ls) s
atomically (writeTVar (lsAsyncState ls) s')
handleEvent (Reaction r) ls = do
m <- reactIO r (lsAsyncState ls)
Expand All @@ -335,7 +333,7 @@ handleEvent (Reaction r) ls = do
-- XXX: Non-atomic update of `lsAsyncState`, should be fixed...
-- XXX
s <- readTVarIO (lsAsyncState ls)
s' <- act (lsName ls) as s (lsTransport ls)
s' <- act (lsName ls) as (lsTransport ls) s
atomically (writeTVar (lsAsyncState ls) s')
AdminSendResponse returnVar msg ->
atomically (putTMVar returnVar msg)
Expand Down

0 comments on commit 7beb822

Please sign in to comment.