From 71a2442f897da0ebd7d6aac2bfbf7ee6f0e4c52f Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Mon, 31 May 2021 12:26:27 +0200 Subject: [PATCH] feat(runtime): first steps towards adding timers, prng and get current time --- .../src/StuntDouble/ActorMap.hs | 58 +++++++++++++++---- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/src/runtime-prototype/src/StuntDouble/ActorMap.hs b/src/runtime-prototype/src/StuntDouble/ActorMap.hs index 9e3aa3f3..4c1d4557 100644 --- a/src/runtime-prototype/src/StuntDouble/ActorMap.hs +++ b/src/runtime-prototype/src/StuntDouble/ActorMap.hs @@ -58,8 +58,9 @@ data ActorF x | On Promise (Resolution -> Free ActorF ()) (() -> x) | Get (State -> x) | Put State (() -> x) - -- XXX: Random? - -- XXX: GetTime? + | GetTime (UTCTime -> x) + | Random (Double -> x) + | SetTimer Time.NominalDiffTime (Promise -> x) -- XXX: Throw? deriving instance Functor ActorF @@ -84,6 +85,15 @@ put s' = Free (Put s' return) modify :: (State -> State) -> Free ActorF () modify f = put . f =<< get +getTime :: Free ActorF UTCTime +getTime = Free (GetTime return) + +random :: Free ActorF Double +random = Free (Random return) + +setTimer :: Time.NominalDiffTime -> Free ActorF Promise +setTimer ndt = Free (SetTimer ndt return) + ------------------------------------------------------------------------ newtype ActorMap = ActorMap (Map LocalRef (Message -> Actor, State)) @@ -110,6 +120,7 @@ data Action = SendAction LocalRef Message RemoteRef Promise | AsyncIOAction (IO IOResult) Promise | OnAction Promise (Resolution -> Free ActorF ()) LocalRef + | SetTimerAction Time.NominalDiffTime Promise -- XXX: what about exceptions? transactional in state, but also in actions?! actorMapTurn :: LocalRef -> Message -> ActorMap @@ -144,6 +155,16 @@ actorMapTurn' p acc lref (Free op) am = case op of case am of ActorMap m -> actorMapTurn' p acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m)) + GetTime k -> + -- XXX: Should time live in the actor map (so each actor can have a + -- different perception of what the time is), or should time live in the + -- event loop? The former can be useful for simulating time scews even + -- though all actors are run in the same event loop. + actorMapTurn' p acc lref (k (error "XXX: need current time here...")) am + Random k -> + error "XXX: need seed from `EventLoop`..." + SetTimer ndt k -> + actorMapTurn' (p + 1) (SetTimerAction ndt p : acc) lref (k p) am actorMapPeek :: LocalRef -> Message -> ActorMap -> (Message, ActorMap) actorMapPeek lref msg am = @@ -230,9 +251,13 @@ data AsyncState = AsyncState { asyncStateAsyncIO :: Map (Async IOResult) Promise , asyncStateContinuations :: Map Promise (Resolution -> Free ActorF (), LocalRef) , asyncStateAdminSend :: Map Promise (TMVar Message) - , asyncStateTimeouts :: Heap (Entry UTCTime Promise) + , asyncStateTimeouts :: Heap (Entry UTCTime (TimeoutKind, Promise)) } +data TimeoutKind + = SendTimeout + | TimerTimeout + emptyAsyncState :: AsyncState emptyAsyncState = AsyncState Map.empty Map.empty Map.empty Heap.empty @@ -242,6 +267,7 @@ madePromises = foldMap go go (SendAction _from _msg _to (Promise i)) = Set.singleton i go (AsyncIOAction _io (Promise i)) = Set.singleton i go OnAction {} = Set.empty + go (SetTimerAction _ndt (Promise i)) = Set.singleton i act :: EventLoopName -> [Action] -> Time -> Transport IO -> AsyncState -> IO AsyncState act name as time transport s0 = foldM go s0 as @@ -257,7 +283,7 @@ act name as time transport s0 = foldM go s0 as -- XXX: make it possible to specify when a send request should timeout. let timeoutAfter = Time.addUTCTime 60 t return s { asyncStateTimeouts = - Heap.insert (Entry timeoutAfter p) (asyncStateTimeouts s) } + Heap.insert (Entry timeoutAfter (SendTimeout, p)) (asyncStateTimeouts s) } go s (AsyncIOAction io p) = do -- XXX: Use `asyncOn` a different capability than main loop. a <- async io @@ -269,10 +295,15 @@ act name as time transport s0 = foldM go s0 as Map.insert p (k, lref) (asyncStateContinuations s) }) | otherwise = error "act: impossible, `On` must be supplied with a promise that was just made." + go s (SetTimerAction ndt p) = do + t <- getCurrentTime time + let timeoutAfter = Time.addUTCTime ndt t + return s { asyncStateTimeouts = + Heap.insert (Entry timeoutAfter (TimerTimeout, p)) (asyncStateTimeouts s) } data Reaction = Receive Promise Envelope - | SendTimeout (Free ActorF ()) LocalRef + | SendTimeoutReaction (Free ActorF ()) LocalRef | AsyncIOFinished Promise IOResult | AsyncIOFailed Promise SomeException @@ -300,7 +331,7 @@ react (Receive p e) s = (AdminSendResponse returnVar (envelopeMessage e), s { asyncStateAdminSend = Map.delete p (asyncStateAdminSend s) }) -react (SendTimeout a lref) s = (ResumeContinuation a lref, s) +react (SendTimeoutReaction a lref) s = (ResumeContinuation a lref, s) react (AsyncIOFinished p result) s = case Map.lookup p (asyncStateContinuations s) of Nothing -> @@ -426,7 +457,7 @@ handleTimeouts ls = forever go now <- getCurrentTime (lsTime ls) als <- atomically (stateTVar (lsAsyncState ls) (findTimedout now)) mapM_ (\(a, lref) -> - atomically (writeTBQueue (lsQueue ls) (Reaction (SendTimeout a lref)))) + atomically (writeTBQueue (lsQueue ls) (Reaction (SendTimeoutReaction a lref)))) als findTimedout :: UTCTime -> AsyncState @@ -434,16 +465,19 @@ findTimedout :: UTCTime -> AsyncState findTimedout now s = let (timedout, heap') = Heap.span (\(Entry t _p) -> t <= now) (asyncStateTimeouts s) - ps = map Heap.payload (toList timedout) - cs = catMaybes (map (\p -> Map.lookup p (asyncStateContinuations s)) ps) - als = map ((\(c, lref) -> (c (Left (error "XXX: timeout")), lref))) cs - ks = foldr Map.delete (asyncStateContinuations s) ps + ts = map Heap.payload (toList timedout) + cs = catMaybes (map (\(tk, p) -> + fmap (\c -> (tk, c)) (Map.lookup p (asyncStateContinuations s))) ts) + als = map ((\(tk, (c, lref)) -> case tk of + -- XXX: Introduce `Timer` in `Resolution`. + SendTimeout -> (c (Left (error "XXX: timeout")), lref) + TimerTimeout -> (c (Left (error "XXX: tick")), lref))) cs + ks = foldr Map.delete (asyncStateContinuations s) (map snd ts) in (als, s { asyncStateContinuations = ks , asyncStateTimeouts = heap' }) - handleEvents :: EventLoop -> IO () handleEvents ls = forever go where