Skip to content

Commit

Permalink
feat(runtime): towards support for async i/o
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed May 10, 2021
1 parent 99ceaf5 commit 89a1a97
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 65 deletions.
28 changes: 4 additions & 24 deletions src/runtime-prototype/src/StuntDouble/Actor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Actor = Free ActorF (Cont Message)
data Cont a
= Now a
| Later (Async a) (a -> Actor)
| LaterIO (Async IOResult) (IOResult -> Actor)

-- Sketch of later extension:

Expand All @@ -28,7 +29,6 @@ data Cont a
-- | Atleast Int
-- | ...

-- XXX: dup
data IOResult = Unit | String String
deriving Show

Expand All @@ -37,15 +37,16 @@ data ActorF x
| RemoteCall RemoteRef Message (Async Message -> x)
| AsyncIO (IO IOResult) (Async IOResult -> x)
-- | On [(Async a)] Strategy ([a] -> x) (() -> x)
-- | On (Async IOResult) (IOResult -> x) (() -> x)
| Get (State -> x)
| Put State (() -> x)
deriving instance Functor ActorF

on :: Async a -> (a -> Free ActorF ()) -> Free ActorF ()
on = undefined

call' :: LocalRef -> Message -> Free ActorF Message
call' lr m = Free (Call lr m return)
call :: LocalRef -> Message -> Free ActorF Message
call lr m = Free (Call lr m return)

remoteCall :: RemoteRef -> Message -> Free ActorF (Async Message)
remoteCall rr m = Free (RemoteCall rr m return)
Expand All @@ -57,24 +58,3 @@ asyncIO m = Free (AsyncIO m return)
-- XXX:
newtype State = State Int
deriving Num

exampleActor :: LocalRef -> RemoteRef -> Message -> Actor
exampleActor lref rref msg = do
state <- Free (Get return)
reply <- Free (Call lref (Message "help") return)
Free (Put 1 return)
a <- Free (RemoteCall rref (Message ("got: " ++ show msg)) return)
return $ Later a $ \msg -> do
Free (Put 2 return)
return (Now msg)

actorA :: RemoteRef -> Message -> Actor
actorA bref (Message "init") = do
a <- Free (RemoteCall bref (Message "hi") return)
return (Later a (\reply -> return (Now reply)))

actorB :: RemoteRef -> Message -> Actor
actorB aref (Message "hi") = do
-- a <- Free (RemoteCall bref (Message "init") return)
-- return (Later a (\reply -> return (Now (Message "bye"))))
undefined
17 changes: 14 additions & 3 deletions src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import StuntDouble.FreeMonad
import StuntDouble.EventLoop.State
import StuntDouble.EventLoop.Event
import StuntDouble.EventLoop.InboundHandler
import StuntDouble.EventLoop.AsyncIOHandler

------------------------------------------------------------------------

Expand Down Expand Up @@ -51,8 +52,8 @@ initLoopState name transport elog =
<*> newTVarIO []
<*> newTBQueueIO 128
<*> newTVarIO Map.empty
<*> newTVarIO Map.empty
<*> newTVarIO []
<*> newTVarIO Map.empty
<*> pure transport
<*> newTVarIO 0
<*> newTVarIO Map.empty
Expand All @@ -65,8 +66,10 @@ makeEventLoop fp name elog = do
transport <- namedPipeTransport fp name
ls <- initLoopState name transport elog
aInHandler <- async (handleInbound ls)
aAsyncIOHandler <- async (handleAsyncIO ls)
aEvHandler <- async (handleEvents ls)
atomically (modifyTVar' (loopStatePids ls) ([aInHandler, aEvHandler] ++ ))
atomically (modifyTVar' (loopStatePids ls)
([aInHandler, aEvHandler, aAsyncIOHandler] ++))
return (EventLoopRef ls)

handleEvents :: LoopState -> IO ()
Expand All @@ -83,6 +86,7 @@ handleEvents ls = go
handleEvent :: Event -> LoopState -> IO ()
handleEvent (Command c) ls = handleCommand c ls
handleEvent (Receive r) ls = handleReceive r ls
handleEvent (AsyncIODone a r) ls = handleAsyncIODone a r ls

handleCommand :: Command -> LoopState -> IO ()
handleCommand (Spawn actor respVar) ls = atomically $ do
Expand Down Expand Up @@ -126,6 +130,9 @@ handleReceive e ls = case envelopeKind e of
-- continuation.
say ls "installing continuation"
installContinuation async (envelopeReceiver e) (envelopeCorrelationId e) k ls
LaterIO async k -> do
say ls "installing i/o continuation"
-- installIOContinuation async ...

ResponseKind -> do
emit ls (LogReceive (envelopeSender e) (envelopeReceiver e) (envelopeMessage e)
Expand All @@ -150,6 +157,9 @@ handleReceive e ls = case envelopeKind e of
Later {} -> do
undefined

handleAsyncIODone :: Async IOResult -> IOResult -> LoopState -> IO ()
handleAsyncIODone a r ls = undefined

runActor :: LoopState -> RemoteRef -> Free ActorF a -> IO a
runActor ls self = iterM go return
where
Expand Down Expand Up @@ -181,7 +191,8 @@ runActor ls self = iterM go return
emit ls (LogSendStart self rref msg corrId)
k a
go (AsyncIO m k) = do
a <- async m
a <- async m -- XXX: Use `asyncOn` a different capability than the main
-- event loop is running on.
atomically (modifyTVar' (loopStateIOAsyncs ls) (a :))
k a
go (Get k) = do
Expand Down
33 changes: 0 additions & 33 deletions src/runtime-prototype/src/StuntDouble/EventLoop/AsyncHandler.hs

This file was deleted.

22 changes: 22 additions & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/AsyncIOHandler.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module StuntDouble.EventLoop.AsyncIOHandler where

import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import Data.List

import StuntDouble.EventLoop.State
import StuntDouble.EventLoop.Event

------------------------------------------------------------------------

handleAsyncIO :: LoopState -> IO ()
handleAsyncIO ls = forever go
where
go = atomically $ do
-- XXX: Use waitAnyCatchSTM and handle exceptions appropriately here, e.g.
-- by extending `AsyncIODone` with `Fail` and `Info`.
as <- readTVar (loopStateIOAsyncs ls)
(a, ioResult) <- waitAnySTM as
writeTBQueue (loopStateQueue ls) (AsyncIODone a ioResult)
writeTVar (loopStateIOAsyncs ls) (delete a as)
9 changes: 6 additions & 3 deletions src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module StuntDouble.EventLoop.Event where

import Control.Concurrent.STM
import Control.Concurrent.Async

import StuntDouble.Actor
import StuntDouble.Message
Expand All @@ -11,20 +12,22 @@ import StuntDouble.Reference
------------------------------------------------------------------------

data Event
= Command Command
= Command Command
| Receive Envelope
| AsyncIODone (Async IOResult) IOResult

eventName :: Event -> String
eventName (Command cmd) = "Command/" ++ commandName cmd
eventName (Receive e) = "Receive/" ++ show e
eventName (AsyncIODone _a ioResult) = "AsyncIODone/" ++ show ioResult

data Command
= Spawn (Message -> Actor) (TMVar LocalRef)
| Quit

commandName :: Command -> String
commandName Spawn {} = "Spawn"
commandName Quit {} = "Quit"
commandName Spawn {} = "Spawn"
commandName Quit {} = "Quit"

newtype CorrelationId = CorrelationId Int
deriving (Eq, Ord, Show, Read, Num, Enum)
Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/src/StuntDouble/EventLoop/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ data LoopState = LoopState
-- event loop itself.
, loopStateQueue :: TBQueue Event
, loopStateActors :: TVar (Map LocalRef (Message -> Actor)) -- XXX: Only changed by main loop, so no need for STM?
, loopStateIOHandlers :: TVar (Map (Async IOResult) (LocalRef, IOResult -> Actor))
, loopStateIOAsyncs :: TVar [Async IOResult]
, loopStateIOHandlers :: TVar (Map (Async IOResult) (LocalRef, IOResult -> Actor))
, loopStateTransport :: Transport IO -- Will not change once created, so doesn't need STM?
, loopStateNextCorrelationId :: TVar CorrelationId
, loopStateResponses :: TVar (Map CorrelationId (TMVar Message))
Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ library
StuntDouble
StuntDouble.Actor
StuntDouble.EventLoop
StuntDouble.EventLoop.AsyncHandler
StuntDouble.EventLoop.AsyncIOHandler
StuntDouble.EventLoop.Event
StuntDouble.EventLoop.InboundHandler
StuntDouble.EventLoop.State
Expand Down
17 changes: 17 additions & 0 deletions src/runtime-prototype/test/StuntDouble/EventLoopTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module StuntDouble.EventLoopTest where

import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async
import Test.HUnit

Expand All @@ -18,6 +19,11 @@ testActor2 rref (Message "init") = do
a <- remoteCall rref (Message "hi")
return (Later a (\(Message msg) -> return (Now (Message ("Got: " ++ msg)))))

testActor3 :: Message -> Actor
testActor3 (Message "init") = do
a <- asyncIO (threadDelay 1000000 >> return (String "result"))
return (LaterIO a (\(String result) -> return (Now (Message ("Got: " ++ result)))))

eventLoopA :: String -> EventLoopName
eventLoopA suffix = EventLoopName ("event-loop-a" ++ "-" ++ suffix)

Expand Down Expand Up @@ -89,3 +95,14 @@ unit_sendLater = do

quit el1
quit el2

unit_asyncIO :: Assertion
unit_asyncIO = do
elog <- emptyEventLog
let ev = eventLoopA "asyncIO"
el <- makeEventLoop "/tmp" ev elog
catch (do lref <- spawn el testActor3
a <- send el (localToRemoteRef ev lref) (Message "init")
reply <- wait a
reply @?= Message "Got: result")
(\(e :: SomeException) -> dump el >> eventLog el >>= mapM_ print >> print e)

0 comments on commit 89a1a97

Please sign in to comment.