Skip to content

Commit

Permalink
Merge pull request #250 from symbiont-io/onawait
Browse files Browse the repository at this point in the history
feat(runtime): add await and make state more flexible
  • Loading branch information
symbiont-stevan-andjelkovic authored May 18, 2021
2 parents 9511413 + 49be08c commit a145d35
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 26 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ workflow related stuff.
[Maelstrom](https://github.com/jepsen-io/maelstrom);
* [stateright](https://github.com/stateright/stateright);
* [Spritely goblins](https://spritelyproject.org/#goblins);
* [seaslug](https://github.com/spacejam/seaslug).
* [seaslug](https://github.com/spacejam/seaslug);
* [Syndicate](https://syndicate-lang.org/).

### License

Expand Down
3 changes: 3 additions & 0 deletions src/runtime-prototype/src/StuntDouble.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
module StuntDouble (module X) where

import StuntDouble.Actor as X
import StuntDouble.Actor.State as X
import StuntDouble.EventLoop as X
import StuntDouble.Message as X
import StuntDouble.EventLoop.Event as X
import StuntDouble.EventLoop.Transport as X
import StuntDouble.EventLoop.Transport.Http as X
import StuntDouble.EventLoop.State as X
import StuntDouble.Reference as X
16 changes: 8 additions & 8 deletions src/runtime-prototype/src/StuntDouble/Actor.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE StandaloneDeriving #-}

module StuntDouble.Actor where
Expand All @@ -10,6 +9,7 @@ import Control.Concurrent.Async
import StuntDouble.Message
import StuntDouble.Reference
import StuntDouble.FreeMonad
import StuntDouble.Actor.State

------------------------------------------------------------------------

Expand Down Expand Up @@ -38,8 +38,11 @@ data ActorF x
| AsyncIO (IO IOResult) (Async IOResult -> x)
-- | On [(Async a)] Strategy ([a] -> x) (() -> x)
-- | On (Async IOResult) (IOResult -> x) (() -> x)
| On (Either (Async Message) (Async IOResult)) (Either Message IOResult -> x) (() -> x)
| UnsafeAwait (Either (Async Message) (Async IOResult)) (Either Message IOResult -> x)
| Get (State -> x)
| Put State (() -> x)
-- | Throw Reason (Void -> x)
deriving instance Functor ActorF

on :: Async a -> (a -> Free ActorF ()) -> Free ActorF ()
Expand All @@ -51,6 +54,10 @@ call lr m = Free (Call lr m return)
remoteCall :: RemoteRef -> Message -> Free ActorF (Async Message)
remoteCall rr m = Free (RemoteCall rr m return)

unsafeAwait :: Either (Async Message) (Async IOResult)
-> Free ActorF (Either Message IOResult)
unsafeAwait a = Free (UnsafeAwait a return)

asyncIO :: IO IOResult -> Free ActorF (Async IOResult)
asyncIO m = Free (AsyncIO m return)

Expand All @@ -59,10 +66,3 @@ get = Free (Get return)

put :: State -> Free ActorF ()
put state' = Free (Put state' return)

-- XXX:
newtype State = State { getState :: Int }
deriving (Show, Num)

initState :: State
initState = State 0
30 changes: 30 additions & 0 deletions src/runtime-prototype/src/StuntDouble/Actor/State.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module StuntDouble.Actor.State where

import Data.Aeson.Types
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.Text (Text)

import StuntDouble.Datatype

------------------------------------------------------------------------

newtype State = State { getState :: HashMap Text Datatype }
deriving Show

initState :: State
initState = State HashMap.empty

withHashMap :: (HashMap Text Datatype -> HashMap Text Datatype) -> State -> State
withHashMap f (State hm) = State (f hm)
withHashMap _f _otherwise = error "withHashMap: impossible, invalid state."

setField :: Text -> Datatype -> State -> State
setField k v = withHashMap (HashMap.insert k v)

getField :: Text -> State -> Datatype
getField k (State hm) = hm HashMap.! k
getField _k _otherwise = error "getField: impossible, invalid state."

add :: Text -> Integer -> State -> State
add k v = withHashMap (HashMap.alter (Just . maybe (Integer v) (plus (Integer v))) k)
28 changes: 28 additions & 0 deletions src/runtime-prototype/src/StuntDouble/Datatype.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module StuntDouble.Datatype where

import Data.Heap (Heap)
import Data.Map (Map)
import Data.Text

------------------------------------------------------------------------

data Datatype
= Unit ()
| Integer Integer
| Double Double
| Bool Bool
| Text Text
| Enum [Text]
| Pair Datatype Datatype
| Inl Datatype
| Inr Datatype
-- | Timestamp UTCTime
| Map (Map Datatype Datatype)
| List [Datatype]
| Heap (Heap Datatype)
deriving Show

------------------------------------------------------------------------

plus :: Datatype -> Datatype -> Datatype
plus (Integer i) (Integer j) = Integer (i + j)
17 changes: 16 additions & 1 deletion src/runtime-prototype/src/StuntDouble/EventLoop.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Control.Concurrent.STM.TBQueue
import System.Timeout

import StuntDouble.Actor
import StuntDouble.Actor.State
import StuntDouble.Message
import StuntDouble.Reference
import StuntDouble.EventLoop.Transport
Expand Down Expand Up @@ -204,9 +205,23 @@ runActor ls self = iterM go return
k a
go (AsyncIO m k) = do
a <- async m -- XXX: Use `asyncOn` a different capability than the main
-- event loop is running on.
-- event loop is running on. Or queue up the async to some
-- queue which a worker pool, running on different
-- capabilities, process.
atomically (modifyTVar' (loopStateIOAsyncs ls) (a :))
k a
go (On (Left a) c k) = do
let c' = \msg -> c (Left msg)
-- XXX: install continuation
undefined
k ()
go (On (Right a) c k) = undefined
go (UnsafeAwait (Left a) k) = do
reply <- wait a
k (Left reply)
go (UnsafeAwait (Right a) k) = do
x <- wait a
k (Right x)
go (Get k) = do
states <- readTVarIO (loopStateActorState ls)
let state = states Map.! remoteToLocalRef self
Expand Down
1 change: 1 addition & 0 deletions src/runtime-prototype/src/StuntDouble/EventLoop/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Control.Concurrent.Async
import StuntDouble.EventLoop.Event
import StuntDouble.EventLoop.Transport
import StuntDouble.Actor
import StuntDouble.Actor.State
import StuntDouble.Reference
import StuntDouble.Message

Expand Down
18 changes: 12 additions & 6 deletions src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ tested-with: GHC ==8.10.4

flag http
description: Enable http transport support
default: False
default: True

library
hs-source-dirs: src/
exposed-modules:
StuntDouble
StuntDouble.Actor
StuntDouble.Actor.State
StuntDouble.Datatype
StuntDouble.EventLoop
StuntDouble.EventLoop.AsyncIOHandler
StuntDouble.EventLoop.Event
Expand All @@ -50,18 +52,19 @@ library
, stm

build-depends:
async
aeson
, async
, random
, unix
, text
, unordered-containers
, heaps

if flag(http)
build-depends:
aeson
, bytestring
bytestring
, http-client
, http-types
, unordered-containers
, text
, wai
, warp

Expand All @@ -83,7 +86,10 @@ test-suite test
other-modules:
StuntDouble.EventLoop.TransportTest
StuntDouble.EventLoopTest
StuntDouble.SchedulerTest
TastyDiscover
-- TODO(stevan: This doesn't work, because tasty-discovery finds the module
-- and tells cabal it's needed...
if flag(http)
other-modules:
StuntDouble.EventLoop.Transport.HttpTest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{-# LANGUAGE ScopedTypeVariables #-}

module StuntDouble.EventLoop.Transport.HttpTest where

import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async
import Test.HUnit

Expand All @@ -13,9 +17,12 @@ unit_httpSendReceive :: IO ()
unit_httpSendReceive = do
let port = 3001
url = "http://localhost:" ++ show port
t <- httpTransport port
let e = Envelope RequestKind (RemoteRef url 0) (Message "msg") (RemoteRef url 1) 0
a <- async (transportSend t e)
e' <- transportReceive t
cancel a
e' @?= e
catch (do t <- httpTransport port
let e = Envelope RequestKind (RemoteRef url 0) (Message "msg") (RemoteRef url 1) 0
-- XXX: add better way to detect when http server is ready...
threadDelay 100000
a <- async (transportSend t e)
e' <- transportReceive t
cancel a
e' @?= e)
(\(e :: SomeException) -> print e)
9 changes: 5 additions & 4 deletions src/runtime-prototype/test/StuntDouble/EventLoopTest.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module StuntDouble.EventLoopTest where
Expand Down Expand Up @@ -116,10 +117,10 @@ unit_asyncIO = do
statefulActor :: Message -> Actor
statefulActor (Message intStr) = do
s <- get
let int :: Int
let int :: Integer
int = read intStr
s' :: State
s' = State int + s
s' = add "x" int s
put s'
return (Now (Message (show (getState s'))))

Expand All @@ -130,7 +131,7 @@ unit_state = do
el <- makeEventLoop "/tmp" ev elog
lref <- spawn el statefulActor
reply <- invoke el lref (Message "1")
reply @?= Message "1"
reply @?= Message "fromList [(\"x\",Integer 1)]"
reply2 <- invoke el lref (Message "2")
reply2 @?= Message "3"
reply2 @?= Message "fromList [(\"x\",Integer 3)]"
quit el
45 changes: 45 additions & 0 deletions src/runtime-prototype/test/StuntDouble/SchedulerTest.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module StuntDouble.SchedulerTest where

import Control.Concurrent.Async
import Test.HUnit

import StuntDouble

------------------------------------------------------------------------

fakeExecutor :: IO ()
fakeExecutor = do
let port = 3004
t <- httpTransport port
e <- transportReceive t
envelopeMessage e @?= envelopeMessage e -- XXX: check if cmd is of the right shape
let resp = replyEnvelope e (Message "XXX: needs the right shape")
transportSend t resp

fakeScheduler :: RemoteRef -> Message -> Actor
fakeScheduler executor (Message "step") = do
cmd <- undefined -- popHeap
a <- remoteCall executor cmd
resp <- unsafeAwait (Left a)
-- assert resp -- XXX: check if of the right shape
now <- undefined -- get "time" from state
seed <- undefined -- get "seed"
arrivalTime <- undefined -- genArrivalTime now seed
-- pushHeap arrivalTime resp
return (Now (Message "stepped"))

unit_scheduler :: Assertion
unit_scheduler = do
aExecutor <- async fakeExecutor
elog <- emptyEventLog
let ev = EventLoopName "scheduler"
el <- makeEventLoop "/tmp" ev elog

let executorRef = RemoteRef "http://localhost:3004" 0
lref <- spawn el (fakeScheduler executorRef)
a <- send el (localToRemoteRef ev lref) (Message "step")
reply <- wait a
reply @?= Message "stepped"

quit el
cancel aExecutor

0 comments on commit a145d35

Please sign in to comment.