-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #246 from symbiont-io/actors
Actors
- Loading branch information
Showing
16 changed files
with
711 additions
and
258 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
module StuntDouble (module X) where | ||
|
||
import StuntDouble.Actor as X | ||
import StuntDouble.EventLoop as X | ||
import StuntDouble.Message as X | ||
import StuntDouble.EventLoop.Event as X | ||
import StuntDouble.EventLoop.State as X | ||
import StuntDouble.Reference as X |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,219 @@ | ||
{-# LANGUAGE ScopedTypeVariables #-} | ||
|
||
module StuntDouble.EventLoop where | ||
|
||
import Control.Monad | ||
import Data.Map (Map) | ||
import qualified Data.Map as Map | ||
import Control.Exception | ||
import Control.Concurrent | ||
import Control.Concurrent.Async | ||
import Control.Concurrent.STM | ||
import Control.Concurrent.STM.TBQueue | ||
import System.Timeout | ||
|
||
import StuntDouble.Actor | ||
import StuntDouble.Message | ||
import StuntDouble.Reference | ||
import StuntDouble.EventLoop.Transport | ||
import StuntDouble.FreeMonad | ||
import StuntDouble.EventLoop.State | ||
import StuntDouble.EventLoop.Event | ||
import StuntDouble.EventLoop.InboundHandler | ||
|
||
------------------------------------------------------------------------ | ||
|
||
newtype EventLoopRef = EventLoopRef | ||
{ loopRefLoopState :: LoopState } | ||
|
||
eventLog :: EventLoopRef -> IO EventLog | ||
eventLog = fmap reverse . readTVarIO . loopStateEventLog . loopRefLoopState | ||
|
||
emptyEventLog :: IO (TVar EventLog) | ||
emptyEventLog = newTVarIO [] | ||
|
||
dump :: EventLoopRef -> IO () | ||
dump = dumpState . loopRefLoopState | ||
|
||
dummyDeveloperRef :: EventLoopRef -> RemoteRef | ||
dummyDeveloperRef r = dummyDeveloperRef' (loopRefLoopState r) | ||
|
||
dummyDeveloperRef' :: LoopState -> RemoteRef | ||
dummyDeveloperRef' ls = RemoteRef (getEventLoopName (loopStateName ls)) dummyIndex | ||
where | ||
dummyIndex = -1 | ||
|
||
------------------------------------------------------------------------ | ||
|
||
data EventLoopId = EventLoopId | ||
initLoopState :: EventLoopName -> Transport IO -> TVar EventLog -> IO LoopState | ||
initLoopState name transport elog = | ||
LoopState | ||
<$> pure name | ||
<*> newTVarIO [] | ||
<*> newTBQueueIO 128 | ||
<*> newTVarIO Map.empty | ||
<*> newTVarIO Map.empty | ||
<*> newTVarIO [] | ||
<*> pure transport | ||
<*> newTVarIO 0 | ||
<*> newTVarIO Map.empty | ||
<*> newTVarIO Map.empty | ||
<*> newTVarIO Map.empty | ||
<*> pure elog | ||
|
||
makeEventLoop :: FilePath -> EventLoopName -> TVar EventLog -> IO EventLoopRef | ||
makeEventLoop fp name elog = do | ||
transport <- namedPipeTransport fp name | ||
ls <- initLoopState name transport elog | ||
aInHandler <- async (handleInbound ls) | ||
aEvHandler <- async (handleEvents ls) | ||
atomically (modifyTVar' (loopStatePids ls) ([aInHandler, aEvHandler] ++ )) | ||
return (EventLoopRef ls) | ||
|
||
handleEvents :: LoopState -> IO () | ||
handleEvents ls = go | ||
where | ||
go = do | ||
e <- atomically (readTBQueue (loopStateQueue ls)) | ||
say ls ("handleEvents: " ++ eventName e) | ||
handleEvent e ls | ||
`catch` \(exception :: SomeException) -> | ||
say ls ("handleEvents: exception: " ++ show exception) | ||
go | ||
|
||
handleEvent :: Event -> LoopState -> IO () | ||
handleEvent (Command c) ls = handleCommand c ls | ||
handleEvent (Receive r) ls = handleReceive r ls | ||
|
||
handleCommand :: Command -> LoopState -> IO () | ||
handleCommand (Spawn actor respVar) ls = atomically $ do | ||
actors <- readTVar (loopStateActors ls) | ||
let lref = LocalRef (Map.size actors) | ||
writeTVar (loopStateActors ls) (Map.insert lref actor actors) | ||
putTMVar respVar lref | ||
handleCommand Quit ls = do | ||
pids <- atomically (readTVar (loopStatePids ls)) | ||
threadDelay 100000 | ||
mapM_ cancel pids | ||
|
||
data ActorNotFound = ActorNotFound RemoteRef | ||
deriving Show | ||
|
||
instance Exception ActorNotFound where | ||
|
||
handleReceive :: Envelope -> LoopState -> IO () | ||
handleReceive e ls = case envelopeKind e of | ||
RequestKind -> do | ||
say ls ("handleReceive: Request: " ++ show e) | ||
mActor <- lookupActor (remoteToLocalRef (envelopeReceiver e)) (loopStateActors ls) | ||
case mActor of | ||
-- XXX: Throw here or just log and continue? Or take care of this at one | ||
-- layer above transport where we authenticate and verify messages are only | ||
-- going to known actors, i.e. that the remote refs are valid. | ||
Nothing -> throwIO (ActorNotFound (envelopeReceiver e)) | ||
Just actor -> do | ||
cont <- runActor ls (envelopeReceiver e) (actor (envelopeMessage e)) | ||
case cont of | ||
Now replyMsg -> do | ||
emit ls (LogRequest (envelopeSender e) (envelopeReceiver e) (envelopeMessage e) | ||
replyMsg) | ||
say ls (show (replyEnvelope e replyMsg)) | ||
transportSend (loopStateTransport ls) (replyEnvelope e replyMsg) | ||
Later async k -> do | ||
-- The actor has to talk to other remote actors before being able to reply. | ||
|
||
-- We need to save the correlation id of the original request, so | ||
-- that we can write the response to it once we resume our | ||
-- continuation. | ||
say ls "installing continuation" | ||
installContinuation async (envelopeReceiver e) (envelopeCorrelationId e) k ls | ||
|
||
ResponseKind -> do | ||
emit ls (LogReceive (envelopeSender e) (envelopeReceiver e) (envelopeMessage e) | ||
(envelopeCorrelationId e)) | ||
resps <- readTVarIO (loopStateResponses ls) | ||
let respVar = resps Map.! envelopeCorrelationId e | ||
asyncs <- readTVarIO (loopStateWaitingAsyncs ls) | ||
let async = asyncs Map.! envelopeCorrelationId e | ||
mResumption <- recallContinuation async ls | ||
case mResumption of | ||
Nothing -> do | ||
emit ls (LogSendFinish (envelopeCorrelationId e) (envelopeMessage e)) | ||
atomically (putTMVar respVar (envelopeMessage e)) | ||
Just (self, corrId', k) -> do | ||
emit ls (LogComment (show (envelopeCorrelationId e))) | ||
cont <- runActor ls self (k (envelopeMessage e)) | ||
case cont of | ||
Now replyMsg -> do | ||
emit ls (LogSendFinish (envelopeCorrelationId e) replyMsg) | ||
let respVar' = resps Map.! corrId' | ||
atomically (putTMVar respVar' replyMsg) | ||
Later {} -> do | ||
undefined | ||
|
||
runActor :: LoopState -> RemoteRef -> Free ActorF a -> IO a | ||
runActor ls self = iterM go return | ||
where | ||
go :: ActorF (IO a) -> IO a | ||
go (Call lref msg k) = do | ||
Just actor <- lookupActor lref (loopStateActors ls) | ||
Now reply <- runActor ls self (actor msg) | ||
emit ls (LogInvoke self lref msg reply) | ||
k reply | ||
go (RemoteCall rref msg k) = do | ||
(corrId, respTMVar) <- atomically $ do | ||
corrId <- readTVar (loopStateNextCorrelationId ls) | ||
modifyTVar' (loopStateNextCorrelationId ls) succ | ||
respTMVar <- newEmptyTMVar | ||
modifyTVar' (loopStateResponses ls) (Map.insert corrId respTMVar) | ||
return (corrId, respTMVar) | ||
let envelope = Envelope RequestKind self msg rref corrId | ||
say ls ("RemoteCall: " ++ show envelope) | ||
transportSend (loopStateTransport ls) envelope | ||
a <- async $ atomically $ do | ||
resp <- takeTMVar respTMVar -- XXX: timeout? | ||
modifyTVar' (loopStateResponses ls) (Map.delete corrId) | ||
modifyTVar' (loopStateWaitingAsyncs ls) (Map.delete corrId) | ||
return resp | ||
-- Associate the correlation id with the `Async` `a`, so that we can later | ||
-- install continuations for it. | ||
say ls ("correlating remote call `" ++ show corrId ++ "'") | ||
correlateAsync corrId a ls | ||
emit ls (LogSendStart self rref msg corrId) | ||
k a | ||
go (AsyncIO m k) = do | ||
a <- async m | ||
atomically (modifyTVar' (loopStateIOAsyncs ls) (a :)) | ||
k a | ||
go (Get k) = do | ||
undefined | ||
go (Put state' k) = do | ||
undefined | ||
|
||
data Event | ||
= Spawn Actor | ||
| Call LocalRef Message | ||
| Send RemoteRef Message | ||
| Receive Message | ||
quit :: EventLoopRef -> IO () | ||
quit r = atomically $ | ||
writeTBQueue (loopStateQueue (loopRefLoopState r)) (Command Quit) | ||
|
||
data LoopState = LoopState | ||
{ loopStateQueue :: TBQueue Event } | ||
helper :: EventLoopRef -> (TMVar a -> Command) -> IO a | ||
helper r cmd = do | ||
respVar <- atomically $ do | ||
respVar <- newEmptyTMVar | ||
writeTBQueue (loopStateQueue (loopRefLoopState r)) (Command (cmd respVar)) | ||
return respVar | ||
atomically (takeTMVar respVar) | ||
|
||
initLoopState :: IO LoopState | ||
initLoopState = do | ||
queue <- newTBQueueIO 128 | ||
return (LoopState queue) | ||
spawn :: EventLoopRef -> (Message -> Actor) -> IO LocalRef | ||
spawn r actor = helper r (Spawn actor) | ||
|
||
makeEventLoop :: IO EventLoopId | ||
makeEventLoop = do | ||
loopState <- initLoopState | ||
tid <- forkIO $ forever $ undefined loopState | ||
tid' <- forkIO $ forever $ undefined loopState | ||
undefined | ||
invoke :: EventLoopRef -> LocalRef -> Message -> IO Message | ||
invoke r lref msg = | ||
runActor | ||
(loopRefLoopState r) | ||
(dummyDeveloperRef r) | ||
(Free (Call lref msg return)) | ||
|
||
handleAdminCommands :: TBQueue Event -> IO () | ||
handleAdminCommands queue = undefined | ||
send :: EventLoopRef -> RemoteRef -> Message -> IO (Async Message) | ||
send r rref msg = | ||
runActor | ||
(loopRefLoopState r) | ||
(dummyDeveloperRef r) | ||
(Free (RemoteCall rref msg return)) |
Oops, something went wrong.