Skip to content

Commit

Permalink
feat(runtime): first steps towards adding timers, prng and get curren…
Browse files Browse the repository at this point in the history
…t time
  • Loading branch information
symbiont-stevan-andjelkovic committed May 31, 2021
1 parent ec2bd05 commit 71a2442
Showing 1 changed file with 46 additions and 12 deletions.
58 changes: 46 additions & 12 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ data ActorF x
| On Promise (Resolution -> Free ActorF ()) (() -> x)
| Get (State -> x)
| Put State (() -> x)
-- XXX: Random?
-- XXX: GetTime?
| GetTime (UTCTime -> x)
| Random (Double -> x)
| SetTimer Time.NominalDiffTime (Promise -> x)
-- XXX: Throw?
deriving instance Functor ActorF

Expand All @@ -84,6 +85,15 @@ put s' = Free (Put s' return)
modify :: (State -> State) -> Free ActorF ()
modify f = put . f =<< get

getTime :: Free ActorF UTCTime
getTime = Free (GetTime return)

random :: Free ActorF Double
random = Free (Random return)

setTimer :: Time.NominalDiffTime -> Free ActorF Promise
setTimer ndt = Free (SetTimer ndt return)

------------------------------------------------------------------------

newtype ActorMap = ActorMap (Map LocalRef (Message -> Actor, State))
Expand All @@ -110,6 +120,7 @@ data Action
= SendAction LocalRef Message RemoteRef Promise
| AsyncIOAction (IO IOResult) Promise
| OnAction Promise (Resolution -> Free ActorF ()) LocalRef
| SetTimerAction Time.NominalDiffTime Promise

-- XXX: what about exceptions? transactional in state, but also in actions?!
actorMapTurn :: LocalRef -> Message -> ActorMap
Expand Down Expand Up @@ -144,6 +155,16 @@ actorMapTurn' p acc lref (Free op) am = case op of
case am of
ActorMap m ->
actorMapTurn' p acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m))
GetTime k ->
-- XXX: Should time live in the actor map (so each actor can have a
-- different perception of what the time is), or should time live in the
-- event loop? The former can be useful for simulating time scews even
-- though all actors are run in the same event loop.
actorMapTurn' p acc lref (k (error "XXX: need current time here...")) am
Random k ->
error "XXX: need seed from `EventLoop`..."
SetTimer ndt k ->
actorMapTurn' (p + 1) (SetTimerAction ndt p : acc) lref (k p) am

actorMapPeek :: LocalRef -> Message -> ActorMap -> (Message, ActorMap)
actorMapPeek lref msg am =
Expand Down Expand Up @@ -230,9 +251,13 @@ data AsyncState = AsyncState
{ asyncStateAsyncIO :: Map (Async IOResult) Promise
, asyncStateContinuations :: Map Promise (Resolution -> Free ActorF (), LocalRef)
, asyncStateAdminSend :: Map Promise (TMVar Message)
, asyncStateTimeouts :: Heap (Entry UTCTime Promise)
, asyncStateTimeouts :: Heap (Entry UTCTime (TimeoutKind, Promise))
}

data TimeoutKind
= SendTimeout
| TimerTimeout

emptyAsyncState :: AsyncState
emptyAsyncState = AsyncState Map.empty Map.empty Map.empty Heap.empty

Expand All @@ -242,6 +267,7 @@ madePromises = foldMap go
go (SendAction _from _msg _to (Promise i)) = Set.singleton i
go (AsyncIOAction _io (Promise i)) = Set.singleton i
go OnAction {} = Set.empty
go (SetTimerAction _ndt (Promise i)) = Set.singleton i

act :: EventLoopName -> [Action] -> Time -> Transport IO -> AsyncState -> IO AsyncState
act name as time transport s0 = foldM go s0 as
Expand All @@ -257,7 +283,7 @@ act name as time transport s0 = foldM go s0 as
-- XXX: make it possible to specify when a send request should timeout.
let timeoutAfter = Time.addUTCTime 60 t
return s { asyncStateTimeouts =
Heap.insert (Entry timeoutAfter p) (asyncStateTimeouts s) }
Heap.insert (Entry timeoutAfter (SendTimeout, p)) (asyncStateTimeouts s) }
go s (AsyncIOAction io p) = do
-- XXX: Use `asyncOn` a different capability than main loop.
a <- async io
Expand All @@ -269,10 +295,15 @@ act name as time transport s0 = foldM go s0 as
Map.insert p (k, lref) (asyncStateContinuations s) })
| otherwise =
error "act: impossible, `On` must be supplied with a promise that was just made."
go s (SetTimerAction ndt p) = do
t <- getCurrentTime time
let timeoutAfter = Time.addUTCTime ndt t
return s { asyncStateTimeouts =
Heap.insert (Entry timeoutAfter (TimerTimeout, p)) (asyncStateTimeouts s) }

data Reaction
= Receive Promise Envelope
| SendTimeout (Free ActorF ()) LocalRef
| SendTimeoutReaction (Free ActorF ()) LocalRef
| AsyncIOFinished Promise IOResult
| AsyncIOFailed Promise SomeException

Expand Down Expand Up @@ -300,7 +331,7 @@ react (Receive p e) s =
(AdminSendResponse returnVar (envelopeMessage e),
s { asyncStateAdminSend =
Map.delete p (asyncStateAdminSend s) })
react (SendTimeout a lref) s = (ResumeContinuation a lref, s)
react (SendTimeoutReaction a lref) s = (ResumeContinuation a lref, s)
react (AsyncIOFinished p result) s =
case Map.lookup p (asyncStateContinuations s) of
Nothing ->
Expand Down Expand Up @@ -426,24 +457,27 @@ handleTimeouts ls = forever go
now <- getCurrentTime (lsTime ls)
als <- atomically (stateTVar (lsAsyncState ls) (findTimedout now))
mapM_ (\(a, lref) ->
atomically (writeTBQueue (lsQueue ls) (Reaction (SendTimeout a lref))))
atomically (writeTBQueue (lsQueue ls) (Reaction (SendTimeoutReaction a lref))))
als

findTimedout :: UTCTime -> AsyncState
-> ([(Free ActorF (), LocalRef)], AsyncState)
findTimedout now s =
let
(timedout, heap') = Heap.span (\(Entry t _p) -> t <= now) (asyncStateTimeouts s)
ps = map Heap.payload (toList timedout)
cs = catMaybes (map (\p -> Map.lookup p (asyncStateContinuations s)) ps)
als = map ((\(c, lref) -> (c (Left (error "XXX: timeout")), lref))) cs
ks = foldr Map.delete (asyncStateContinuations s) ps
ts = map Heap.payload (toList timedout)
cs = catMaybes (map (\(tk, p) ->
fmap (\c -> (tk, c)) (Map.lookup p (asyncStateContinuations s))) ts)
als = map ((\(tk, (c, lref)) -> case tk of
-- XXX: Introduce `Timer` in `Resolution`.
SendTimeout -> (c (Left (error "XXX: timeout")), lref)
TimerTimeout -> (c (Left (error "XXX: tick")), lref))) cs
ks = foldr Map.delete (asyncStateContinuations s) (map snd ts)
in
(als, s { asyncStateContinuations = ks
, asyncStateTimeouts = heap'
})


handleEvents :: EventLoop -> IO ()
handleEvents ls = forever go
where
Expand Down

0 comments on commit 71a2442

Please sign in to comment.