Skip to content

Commit

Permalink
refactor(runtime): add async io
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Apr 28, 2021
1 parent e0e4ade commit 03f3573
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 72 deletions.
37 changes: 22 additions & 15 deletions src/runtime-prototype/src/StuntDouble/Actor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,37 @@ data Cont a
= Now a
| Later (Async a) (a -> Actor)

- -- Sketch of later extension:
-
- -- Later [Async a] Strategy ([Either Exception a] -> Actor)
- -- data Strategy
- -- = Any -- ^ call the continuation as soon as any of the asyncs finishes (or succeeds?).
- -- | All -- ^ call the continuation when all asyncs finish.
- -- | Atleast Int
- -- | ...
-- Sketch of later extension:

-- Later [Async a] Strategy ([Either Exception a] -> Actor)
-- data Strategy
-- = Any -- ^ call the continuation as soon as any of the asyncs finishes (or succeeds?).
-- | All -- ^ call the continuation when all asyncs finish.
-- | Atleast Int
-- | ...

-- XXX: dup
data IOResult = Unit | String String

data ActorF x
= Call LocalRef Message (Message -> x)
| RemoteCall RemoteRef Message (Async Message -> x)
| forall a. AsyncIO (IO a) (Async a -> x)
| AsyncIO (IO IOResult) (Async IOResult -> x)
-- | On [(Async a)] Strategy ([a] -> x) (() -> x)
| Get (State -> x)
| Put State (() -> x)
deriving instance Functor ActorF

on :: Async a -> (a -> Free ActorF ()) -> Free ActorF ()
on = undefined

call :: LocalRef -> Message -> Free ActorF Message
call lr m = Free (Call lr m return)
call' :: LocalRef -> Message -> Free ActorF Message
call' lr m = Free (Call lr m return)

remoteCall :: RemoteRef -> Message -> Free ActorF (Async Message)
remoteCall rr m = Free (RemoteCall rr m return)

asyncIO :: IO a -> Free ActorF (Async a)
asyncIO :: IO IOResult -> Free ActorF (Async IOResult)
asyncIO m = Free (AsyncIO m return)


Expand All @@ -67,9 +73,10 @@ exampleActor lref rref msg = do
actorA :: RemoteRef -> Message -> Actor
actorA bref (Message "init") = do
a <- Free (RemoteCall bref (Message "hi") return)
return (Later a (\reply -> return (Now a)))
return (Later a (\reply -> return (Now reply)))

actorB :: RemoteRef -> Message -> Actor
actorB aref (Message "hi") = do
a <- Free (RemoteCall bref (Message "init") return)
return (Later a (\reply -> return (Now (Message "bye")))
-- a <- Free (RemoteCall bref (Message "init") return)
-- return (Later a (\reply -> return (Now (Message "bye"))))
undefined
46 changes: 17 additions & 29 deletions src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE ExistentialQuantification #-}
module StuntDouble.EventLoop where

import Data.Map (Map)
Expand All @@ -9,49 +10,27 @@ import Control.Concurrent.STM.TBQueue
import StuntDouble.Actor
import StuntDouble.Message
import StuntDouble.Reference
import StuntDouble.EventLoop.State
import StuntDouble.EventLoop.Event
import StuntDouble.EventLoop.RequestHandler

------------------------------------------------------------------------

data Command
= Spawn (Message -> Actor) (TMVar LocalRef)
| Invoke LocalRef Message (TMVar Message)
| Send RemoteRef Message
| Quit

data Response
= Receive AsyncRef Message

-- XXX:
type RequestId = Int
type AsyncRef = (Async Message, RequestId)
type InternalActorRef = Int

data Event = Command Command | Response Response

newtype EventLoopRef = EventLoopRef
{ loopRefLoopState :: LoopState }

data LoopState = LoopState
{ loopStateAsync :: TMVar (Async ()) -- | Hold the `Async` of the event loop itself.
, loopStateQueue :: TBQueue Event
, loopStateActors :: TVar (Map InternalActorRef (Message -> Actor))
, loopStateHandlers :: TVar (Map RequestId (Message -> Actor))
-- , loopStateBlockedResponses :: TVar
-- we also need to have the state for each actor..
}

------------------------------------------------------------------------

initLoopState :: IO LoopState
initLoopState = do
a <- newEmptyTMVarIO
queue <- newTBQueueIO 128
return (LoopState a queue undefined undefined)
return (LoopState a queue undefined undefined undefined undefined)

makeEventLoop :: IO EventLoopRef
makeEventLoop = do
loopState <- initLoopState
-- tid <- forkIO $ forever $ undefined loopState
aReqHandler <- async (handleRequests loopState)
-- tid' <- forkIO $ forever $ undefined loopState
a <- async (handleEvents loopState)
atomically (putTMVar (loopStateAsync loopState) a)
Expand All @@ -67,6 +46,7 @@ handleEvents ls = go

handleEvent :: Event -> LoopState -> IO ()
handleEvent (Command c) ls = handleCommand c ls
handleEvent (Receive r) ls = handleReceive r ls

handleCommand :: Command -> LoopState -> IO ()
handleCommand (Spawn actor respVar) ls = do
Expand All @@ -75,11 +55,18 @@ handleCommand (Invoke lr m respVar) ls = do
-- actor <- findActor ls lr
undefined
handleCommand (Send rr m) ls = do
-- a <- async (makeHttpRequest (translateToUrl rr) (seralise m))
-- atomically (modifyTVar (loopStateAsyncs ls) (a :))
undefined
handleCommand Quit ls = do
a <- atomically (takeTMVar (loopStateAsync ls))
cancel a

handleReceive :: Receive -> LoopState -> IO ()
handleReceive (Request e) ls = do
actor <- undefined -- findActor ls (envelopeReceiver e)
runActor ls (actor (envelopeMessage e))

runActor :: LoopState -> Actor -> IO ()
runActor ls = undefined -- iterM go
where
Expand All @@ -89,8 +76,9 @@ runActor ls = undefined -- iterM go
go (RemoteCall rref msg k) = do
undefined
go (AsyncIO m k) = do
x <- async m -- this should probably register this somewhere?
k x
a <- async m
atomically (modifyTVar (loopStateIOAsyncs ls) (a :))
k a
go (Get k) = do
undefined
go (Put state' k) = do
Expand Down
32 changes: 32 additions & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/Event.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module StuntDouble.EventLoop.Event where

import Control.Concurrent.STM
import Control.Concurrent.Async

import StuntDouble.Actor
import StuntDouble.Message
import StuntDouble.Reference

------------------------------------------------------------------------

data Event = Command Command | Response Response | Receive Request

data Command
= Spawn (Message -> Actor) (TMVar LocalRef)
| Invoke LocalRef Message (TMVar Message)
| Send RemoteRef Message
| Quit

data Response
= IOReady (Async IOResult)
-- Receive (Async Message) Message

data Request = Request Envelope

data Envelope = Envelope
{ envelopeSender :: RemoteRef
, envelopeMessage :: Message
, envelopeReceiver :: RemoteRef
}
deriving (Eq, Show, Read)

33 changes: 6 additions & 27 deletions src/runtime-prototype/src/StuntDouble/EventLoop/RequestHandler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,18 @@ import Control.Monad
import Control.Exception
import Control.Concurrent.Async
import Control.Concurrent.STM

import System.IO
import System.IO.Error
import System.Posix.Files

------------------------------------------------------------------------

-- XXX: dup
type Message = String
type RemoteRef = String
data LoopState = LoopState
{ loopStateQueue :: TBQueue Event
, loopStateAsyncs :: TVar [Async Message]
, loopStateTransport :: Transport IO -- Will not change once created, so
-- doesn't need STM?
}
data Event = Receive Request
data Request = Request Envelope
import StuntDouble.EventLoop.State
import StuntDouble.EventLoop.Event
import StuntDouble.EventLoop.Transport
import StuntDouble.Reference
import StuntDouble.Message

------------------------------------------------------------------------

data Envelope = Envelope
{ envelopeSender :: RemoteRef
, envelopeMessage :: Message
, envelopeReceiver :: RemoteRef
}
deriving (Eq, Show, Read)

data Transport m = Transport
{ send :: Envelope -> m ()
, receive :: m Envelope
}

namedPipeTransport :: FilePath -> IO (Transport IO)
namedPipeTransport fp = do
catchJust
Expand Down Expand Up @@ -71,7 +50,7 @@ handleRequest (Request e) ls = undefined
test :: IO ()
test = do
t <- namedPipeTransport "/tmp/test_request.pipe"
let e = Envelope "from" "msg" "to"
let e = Envelope (RemoteRef "from" 0) (Message "msg") (RemoteRef "to" 1)
a <- async (send t e)
e' <- receive t
cancel a
Expand Down
24 changes: 24 additions & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/State.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module StuntDouble.EventLoop.State where

import Data.Map (Map)
import Control.Concurrent.STM
import Control.Concurrent.Async

import StuntDouble.EventLoop.Event
import StuntDouble.EventLoop.Transport
import StuntDouble.Actor
import StuntDouble.Reference
import StuntDouble.Message

------------------------------------------------------------------------


data LoopState = LoopState
{ loopStateAsync :: TMVar (Async ()) -- | Hold the `Async` of the event loop itself.
, loopStateQueue :: TBQueue Event
, loopStateActors :: TVar (Map InternalActorRef (Message -> Actor))
-- , loopStateHandlers :: TVar (Map (Async Message) (Message -> Actor))
, loopStateIOHandlers :: TVar (Map (Async IOResult) (InternalActorRef, IOResult -> Actor))
, loopStateIOAsyncs :: TVar [Async IOResult]
, loopStateTransport :: Transport IO -- Will not change once created, so doesn't need STM?
}
10 changes: 10 additions & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/Transport.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module StuntDouble.EventLoop.Transport where

import StuntDouble.EventLoop.Event

------------------------------------------------------------------------

data Transport m = Transport
{ send :: Envelope -> m ()
, receive :: m Envelope
}
2 changes: 1 addition & 1 deletion src/runtime-prototype/src/StuntDouble/Message.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module StuntDouble.Message where

newtype Message = Message String
deriving (Show, Read)
deriving (Eq, Show, Read)
3 changes: 3 additions & 0 deletions src/runtime-prototype/src/StuntDouble/Reference.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ data RemoteRef = RemoteRef
{ address :: String
, index :: Int
}
deriving (Eq, Show, Read)

localToRemoteRef :: String -> LocalRef -> RemoteRef
localToRemoteRef address (LocalRef i) = RemoteRef address i

type InternalActorRef = Int
3 changes: 3 additions & 0 deletions src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ library
StuntDouble.EventLoop
StuntDouble.EventLoop.AsyncHandler
StuntDouble.EventLoop.RequestHandler
StuntDouble.EventLoop.State
StuntDouble.EventLoop.Event
StuntDouble.EventLoop.Transport
StuntDouble.Message
StuntDouble.Reference
StuntDouble.Supervisor
Expand Down

0 comments on commit 03f3573

Please sign in to comment.