Skip to content

Commit

Permalink
refactor(runtime): finish admin send
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed May 25, 2021
1 parent c428f84 commit a9ba383
Showing 1 changed file with 98 additions and 59 deletions.
157 changes: 98 additions & 59 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import StuntDouble.Reference
newtype Promise = Promise Int
deriving (Eq, Ord, Num)

unPromise :: Promise -> Int
unPromise (Promise i) = i

newtype Actor = Actor { unActor :: Free ActorF Message }

data ActorF x
Expand Down Expand Up @@ -72,97 +75,97 @@ data Action
| OnAction Promise (Either IOResult Message -> Free ActorF ()) LocalRef

-- XXX: what about exceptions? transactional in state, but also in actions?!
actorMapTurn :: LocalRef -> Message -> ActorMap -> ((Message, ActorMap, [Action]), ActorMap)
actorMapTurn :: LocalRef -> Message -> ActorMap
-> ((Message, Promise, ActorMap, [Action]), ActorMap)
actorMapTurn lref0 msg0 am0 =
let
a = fst (actorMapUnsafeLookup lref0 am0)
in
-- XXX: Promises should not always start from 0, or they will overlap each
-- other if more than one turn happens...
(actorMapTurn' 0 [] lref0 (unActor (a msg0)) am0, am0)
(actorMapTurn' (Promise 0) [] lref0 (unActor (a msg0)) am0, am0)

actorMapTurn' :: Int -> [Action] -> LocalRef -> Free ActorF a -> ActorMap
-> (a, ActorMap, [Action])
actorMapTurn' _pc acc _lref (Pure msg) am = (msg, am, reverse acc)
actorMapTurn' pc acc lref (Free op) am = case op of
actorMapTurn' :: Promise -> [Action] -> LocalRef -> Free ActorF a -> ActorMap
-> (a, Promise, ActorMap, [Action])
actorMapTurn' p acc _lref (Pure msg) am = (msg, p, am, reverse acc)
actorMapTurn' p acc lref (Free op) am = case op of
Invoke lref' msg k ->
let
a' = fst (actorMapUnsafeLookup lref' am)
(reply, am', acc') = actorMapTurn' pc acc lref' (unActor (a' msg)) am
(reply, p', am', acc') = actorMapTurn' p acc lref' (unActor (a' msg)) am
in
actorMapTurn' pc acc' lref (k reply) am'
actorMapTurn' p' acc' lref (k reply) am'
Send rref msg k ->
let
p = Promise pc
in
actorMapTurn' (pc + 1) (SendAction lref msg rref p : acc) lref (k p) am
actorMapTurn' (p + 1) (SendAction lref msg rref p : acc) lref (k p) am
AsyncIO io k ->
let
p = Promise pc
in
actorMapTurn' (pc + 1) (AsyncIOAction io p : acc) lref (k p) am
On p c k ->
actorMapTurn' pc (OnAction p c lref : acc) lref (k ()) am
actorMapTurn' (p + 1) (AsyncIOAction io p : acc) lref (k p) am
On q c k ->
actorMapTurn' p (OnAction q c lref : acc) lref (k ()) am
Get k ->
actorMapTurn' pc acc lref (k (snd (actorMapUnsafeLookup lref am))) am
actorMapTurn' p acc lref (k (snd (actorMapUnsafeLookup lref am))) am
Put s' k ->
case am of
ActorMap m ->
actorMapTurn' pc acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m))
actorMapTurn' p acc lref (k ()) (ActorMap (Map.adjust (\(a, _s) -> (a, s')) lref m))

actorMapPeek :: LocalRef -> Message -> ActorMap -> (Message, ActorMap)
actorMapPeek lref msg am =
let
((reply, _am', _as), _am) = actorMapTurn lref msg am
((reply, _p, _am', _as), _am) = actorMapTurn lref msg am
in
(reply, am)

actorMapPoke :: LocalRef -> Message -> ActorMap -> (Message, ActorMap)
actorMapPoke lref msg am =
let
((reply, am', _as), _am) = actorMapTurn lref msg am
((reply, _p, am', _as), _am) = actorMapTurn lref msg am
in
(reply, am')

------------------------------------------------------------------------

type ActorMapTVar = TVar ActorMap

makeActorMapIO :: IO ActorMapTVar
makeActorMapIO :: IO (TVar ActorMap)
makeActorMapIO = newTVarIO emptyActorMap

actorMapSpawnIO :: (Message -> Actor) -> State -> ActorMapTVar -> IO LocalRef
actorMapSpawnIO :: (Message -> Actor) -> State -> TVar ActorMap -> IO LocalRef
actorMapSpawnIO a s am = atomically (stateTVar am (actorMapSpawn a s))

actorMapTurnIO :: LocalRef -> Message -> ActorMapTVar -> IO (Message, ActorMap, [Action])
actorMapTurnIO :: LocalRef -> Message -> TVar ActorMap
-> IO (Message, Promise, ActorMap, [Action])
actorMapTurnIO lref msg am = atomically (stateTVar am (actorMapTurn lref msg))

actorMapPeekIO :: LocalRef -> Message -> ActorMapTVar -> IO Message
actorMapPeekIO :: LocalRef -> Message -> TVar ActorMap -> IO Message
actorMapPeekIO lref msg am = atomically (stateTVar am (actorMapPeek lref msg))

actorMapPokeIO :: LocalRef -> Message -> ActorMapTVar -> IO Message
actorMapPokeIO :: LocalRef -> Message -> TVar ActorMap -> IO Message
actorMapPokeIO lref msg am = atomically (stateTVar am (actorMapPoke lref msg))

------------------------------------------------------------------------

devSend :: {- EventLoopRef-} RemoteRef -> Message -> IO (Async Message)
devSend = undefined
-- p <- createPromise
-- v <- newEmptyTMVar
-- insertDeveloperSend p v
-- async (atomically (takeTMVar v))
send :: EventLoop -> RemoteRef -> Message -> IO (Async Message)
send ls rref msg = do
p <- atomically (stateTVar (lsNextPromise ls) (\p -> (p, p + 1)))
returnVar <- newEmptyTMVarIO
atomically (writeTBQueue (lsQueue ls) (Admin (AdminSend rref msg p returnVar)))
async (atomically (takeTMVar returnVar))

spawn :: EventLoop -> (Message -> Actor) -> State -> IO LocalRef
spawn ls a s = do
returnVar <- newEmptyTMVarIO
atomically (writeTBQueue (lsQueue ls) (Admin (Spawn a s returnVar)))
atomically (takeTMVar returnVar)

------------------------------------------------------------------------

data AsyncState = AsyncState
{ asyncStateAsyncIO :: Map (Async IOResult) Promise
, asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (),
LocalRef)
-- , asyncStateDeveloperSend :: Map Promise (TMVar Message)
{ asyncStateAsyncIO :: Map (Async IOResult) Promise
, asyncStateContinuations :: Map Promise (Either IOResult Message -> Free ActorF (),
LocalRef)
, asyncStateAdminSend :: Map Promise (TMVar Message)
}

emptyAsyncState :: AsyncState
emptyAsyncState = AsyncState Map.empty Map.empty
emptyAsyncState = AsyncState Map.empty Map.empty Map.empty

madePromises :: [Action] -> Set Int
madePromises = foldMap go
Expand Down Expand Up @@ -196,46 +199,63 @@ data Reaction
= Response Promise Message
| AsyncIOFinished (Async IOResult) IOResult

react :: Reaction -> AsyncState -> (Maybe (Free ActorF (), LocalRef), AsyncState)
data ReactTask
= NothingToDo
| ResumeContinuation (Free ActorF ()) LocalRef
| AdminSendResponse (TMVar Message) Message

react :: Reaction -> AsyncState -> (ReactTask, AsyncState)
react (Response p msg) s =
case Map.lookup p (asyncStateContinuations s) of
Just (k, lref) -> (Just (k (Right msg), lref),
Just (k, lref) -> (ResumeContinuation (k (Right msg)) lref,
s { asyncStateContinuations =
Map.delete p (asyncStateContinuations s) })
Nothing ->
-- XXX: Map.lookup p (developerSend s)?
case Map.lookup p (asyncStateAdminSend s) of
Nothing ->
-- We got a response for something we are not (longer) waiting for.
(NothingToDo, s)
Just returnVar ->
(AdminSendResponse returnVar msg,
s { asyncStateAdminSend =
Map.delete p (asyncStateAdminSend s) })

-- We got a response for something we are not (longer) waiting for.
(Nothing, s)
react (AsyncIOFinished a result) s =
case Map.lookup a (asyncStateAsyncIO s) of
Nothing -> error "react: impossible, unknown async finished."
Just p -> case Map.lookup p (asyncStateContinuations s) of
Nothing ->
-- No continuation was registered for this async.
-- XXX: the async handler should take care for this map deletion...
(Nothing, s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) })
Just (k, lref) -> (Just (k (Left result), lref),
(NothingToDo, s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s) })
Just (k, lref) -> (ResumeContinuation (k (Left result)) lref,
s { asyncStateAsyncIO = Map.delete a (asyncStateAsyncIO s)
, asyncStateContinuations = Map.delete p (asyncStateContinuations s)
})

reactIO :: Reaction -> TVar AsyncState -> IO (Maybe (Free ActorF (), LocalRef))
reactIO :: Reaction -> TVar AsyncState -> IO ReactTask
reactIO r v = atomically (stateTVar v (react r))

------------------------------------------------------------------------

data Event
= Action Action
| Reaction Reaction
| Admin Command

data Command
= Spawn (Message -> Actor) State (TMVar LocalRef)
| AdminInvoke LocalRef Message (TMVar Message)
| AdminSend RemoteRef Message Promise (TMVar Message)

data EventLoop = EventLoop
{ lsName :: EventLoopName
, lsActorMap :: TVar ActorMap
, lsAsyncState :: TVar AsyncState
, lsQueue :: TBQueue Event
, lsTransport :: Transport IO
, lsPids :: TVar [Async ()]
{ lsName :: EventLoopName
, lsActorMap :: TVar ActorMap
, lsAsyncState :: TVar AsyncState
, lsQueue :: TBQueue Event
, lsTransport :: Transport IO
, lsPids :: TVar [Async ()]
, lsNextPromise :: TVar Promise
}

initLoopState :: EventLoopName -> Transport IO -> IO EventLoop
Expand All @@ -247,6 +267,7 @@ initLoopState name t =
<*> newTBQueueIO 128
<*> pure t
<*> newTVarIO []
<*> newTVarIO (Promise 0)

makeEventLoop :: TransportKind -> EventLoopName -> IO EventLoop
makeEventLoop tk name = do
Expand Down Expand Up @@ -301,18 +322,36 @@ handleEvent (Action a) ls = do
handleEvent (Reaction r) ls = do
m <- reactIO r (lsAsyncState ls)
case m of
Nothing -> return ()
Just (a, lref) -> do
NothingToDo -> return ()
ResumeContinuation a lref -> do
as <- atomically $ do
am <- readTVar (lsActorMap ls)
let ((), am', as) = actorMapTurn' 0 [] lref a am -- XXX: promise counter
-- should not always be
-- 0...
p <- readTVar (lsNextPromise ls)
let ((), p', am', as) = actorMapTurn' p [] lref a am
writeTVar (lsActorMap ls) am'
writeTVar (lsNextPromise ls) p'
return as
-- XXX:
-- XXX: Non-atomic update of `lsAsyncState`, should be fixed...
-- XXX
s <- readTVarIO (lsAsyncState ls)
s' <- act (lsName ls) as s (lsTransport ls)
atomically (writeTVar (lsAsyncState ls) s')
AdminSendResponse returnVar msg ->
atomically (putTMVar returnVar msg)
handleEvent (Admin cmd) ls = case cmd of
Spawn a s returnVar -> do
lref <- actorMapSpawnIO a s (lsActorMap ls)
atomically (putTMVar returnVar lref)
AdminInvoke lref msg returnVar -> do
reply <- actorMapPokeIO lref msg (lsActorMap ls)
atomically (putTMVar returnVar reply)
AdminSend rref msg p returnVar -> do
-- XXX: is the `from` field in `Envelope` ever used? If it can be removed
-- then this `dummyAdminRef` hack can be removed too...
let dummyAdminRef = localToRemoteRef (lsName ls) (LocalRef (-1))
transportSend (lsTransport ls)
(Envelope RequestKind dummyAdminRef msg rref (CorrelationId (unPromise p)))
atomically (modifyTVar' (lsAsyncState ls)
(\as -> as { asyncStateAdminSend =
Map.insert p returnVar (asyncStateAdminSend as) }))

0 comments on commit a9ba383

Please sign in to comment.