diff --git a/Cargo.toml b/Cargo.toml index 9e4cab3..42963ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,4 +35,5 @@ members = [ "aktoro-runtime", "examples/hello_world", + "examples/net", ] diff --git a/aktoro-channel/src/channel.rs b/aktoro-channel/src/channel.rs index f8b82b5..b0c0f64 100644 --- a/aktoro-channel/src/channel.rs +++ b/aktoro-channel/src/channel.rs @@ -1,21 +1,35 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use std::task::Waker; +use std::sync::Arc; +use std::task; use crossbeam_queue::SegQueue; +use crossbeam_utils::atomic::AtomicCell; use crate::counters::Counters; use crate::error::*; use crate::message::Message; use crate::queue::Queue; +type Waker = Arc)>>; + /// A channel allowing senders to pass /// messages over it, and receivers to /// retrieve them. pub(crate) struct Channel { + /// The queue that is holding the + /// messages that have not been + /// received yet. pub(crate) queue: Queue>, + /// Whether the channel is closed. pub(crate) closed: AtomicBool, + /// The counters used to store the + /// current number of senders, + /// receivers, messages and the + /// channel's limits. pub(crate) counters: Counters, + /// A list of the wakers that can + /// be used to wake up receivers. pub(crate) wakers: SegQueue, } @@ -71,9 +85,7 @@ impl Channel { } } - /// Registers a new waker to be - /// notified when a new message is - /// available. + /// Registers a new waker. pub(crate) fn register(&self, waker: Waker) { self.wakers.push(waker); } @@ -82,7 +94,17 @@ impl Channel { /// available. fn notify(&self) { if let Ok(waker) = self.wakers.pop() { - waker.wake(); + match waker.swap((true, None)) { + (true, Some(waker_)) => { + self.wakers.push(waker); + waker_.wake(); + } + (true, None) => { + self.wakers.push(waker); + self.notify(); + } + _ => self.notify(), + } } } diff --git a/aktoro-channel/src/error.rs b/aktoro-channel/src/error.rs index 7799570..b2d64e6 100644 --- a/aktoro-channel/src/error.rs +++ b/aktoro-channel/src/error.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::error; use std::fmt; use std::fmt::Debug; use std::fmt::Display; @@ -207,11 +207,11 @@ impl TryRecvError { } } -impl StdError for CloneError {} +impl error::Error for CloneError {} -impl StdError for TrySendError {} +impl error::Error for TrySendError {} -impl StdError for TryRecvError {} +impl error::Error for TryRecvError {} impl Display for CloneError { fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { diff --git a/aktoro-channel/src/queue.rs b/aktoro-channel/src/queue.rs index 605cb83..7c77dab 100644 --- a/aktoro-channel/src/queue.rs +++ b/aktoro-channel/src/queue.rs @@ -1,8 +1,12 @@ use crossbeam_queue::ArrayQueue; use crossbeam_queue::SegQueue; +/// The queue used by the channels to send +/// and receive data. pub(crate) enum Queue { + /// The bounded queue variant. Bounded(ArrayQueue), + /// The unbounded queue variant. Unbounded(SegQueue), } diff --git a/aktoro-channel/src/receiver.rs b/aktoro-channel/src/receiver.rs index d56cfe8..40bb3d7 100644 --- a/aktoro-channel/src/receiver.rs +++ b/aktoro-channel/src/receiver.rs @@ -2,7 +2,9 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::task::Waker; +use crossbeam_utils::atomic::AtomicCell; use futures_core::FusedStream; use futures_core::Stream; @@ -15,7 +17,15 @@ use crate::error::*; /// implementation. /// /// [`try_recv`]: #method.try_recv -pub struct Receiver(Option>>); +pub struct Receiver { + /// The channel the receiver will + /// receive data from. + channel: Option>>, + /// A reference to the space the + /// receiver was assigned to store + /// its waker. + waker: Arc)>>, +} impl Receiver { /// Creates a receiver from a pointer @@ -26,13 +36,20 @@ impl Receiver { // immediately after a channel's // creation. channel.counters.add_recver().expect("receivers limit == 0"); - Receiver(Some(channel)) + + let waker = Arc::new(AtomicCell::new((true, None))); + channel.register(waker.clone()); + + Receiver { + waker, + channel: Some(channel), + } } /// Tries to receive a message from /// the channel. pub fn try_recv(&self) -> Result, TryRecvError> { - if let Some(channel) = &self.0 { + if let Some(channel) = &self.channel { match channel.try_recv() { Ok(Some(msg)) => Ok(Some(msg.unwrap())), Ok(None) => Ok(None), @@ -46,7 +63,7 @@ impl Receiver { /// Whether the channel the receiver /// is connected to is closed. pub fn is_closed(&self) -> bool { - if let Some(channel) = &self.0 { + if let Some(channel) = &self.channel { channel.check_is_closed() } else { true @@ -56,7 +73,9 @@ impl Receiver { /// Closes the channel the receiver /// is connected to. pub fn close_channel(&self) { - if let Some(channel) = &self.0 { + self.waker.store((false, None)); + + if let Some(channel) = &self.channel { channel.close() } } @@ -64,7 +83,9 @@ impl Receiver { /// Disconnects the receiver from the /// channel. pub fn disconnect(&mut self) { - let channel = if let Some(channel) = self.0.take() { + self.waker.store((false, None)); + + let channel = if let Some(channel) = self.channel.take() { channel } else { return; @@ -79,9 +100,14 @@ impl Receiver { /// returning a new receiver connected to /// the same channel, or an error. pub fn try_clone(&self) -> Result { - if let Some(channel) = &self.0 { + if let Some(channel) = &self.channel { if channel.counters.add_recver().is_ok() { - Ok(Receiver(Some(channel.clone()))) + let waker = Arc::new(AtomicCell::new((true, None))); + + Ok(Receiver { + waker, + channel: Some(channel.clone()), + }) } else { Err(CloneError::limit()) } @@ -95,7 +121,9 @@ impl Stream for Receiver { type Item = T; fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - if let Some(channel) = &self.0 { + self.waker.store((true, None)); + + if let Some(channel) = &self.channel { // We try to receive a message... match channel.try_recv() { // ...and return it if one is @@ -104,7 +132,8 @@ impl Stream for Receiver { // ...or register the stream's // waker if none is available... Ok(None) => { - channel.register(ctx.waker().clone()); + self.waker.store((true, Some(ctx.waker().clone()))); + Poll::Pending } // ...or stop the stream if @@ -119,7 +148,7 @@ impl Stream for Receiver { impl FusedStream for Receiver { fn is_terminated(&self) -> bool { - if let Some(channel) = &self.0 { + if let Some(channel) = &self.channel { channel.is_closed() && channel.is_empty() } else { true diff --git a/aktoro-channel/tests/tests.rs b/aktoro-channel/tests/tests.rs deleted file mode 100644 index 6326adc..0000000 --- a/aktoro-channel/tests/tests.rs +++ /dev/null @@ -1,185 +0,0 @@ -#![feature(async_await)] - -use aktoro_channel::*; -use futures_util::poll; -use futures_util::StreamExt; - -#[test] -fn base() { - let (sender, recver) = Builder::new().build(); - works(&sender, &recver); -} - -#[runtime::test] -async fn async_base() { - let (sender, recver) = Builder::new().build(); - async_works(&sender, &recver).await; -} - -#[runtime::test] -async fn notify() { - let (sender, recver) = Builder::new().build(); - async_works(&sender, &recver).await; - - let mut notify = sender.try_send_notify(2).unwrap(); - assert!(poll!(&mut notify).is_pending()); - - assert_eq!(recver.try_recv(), Ok(Some(2))); - assert!(poll!(&mut notify).is_ready()); -} - -#[test] -fn disconnect_sender() { - let (mut sender, recver) = Builder::new().build(); - works(&sender, &recver); - - assert_eq!(sender.try_send(2), Ok(())); - - sender.disconnect(); - - assert!(sender.is_closed()); - assert!(recver.is_closed()); - - assert_eq!(recver.try_recv(), Ok(Some(2))); - assert!(recver.try_recv().unwrap_err().is_closed()); -} - -#[test] -fn disconnect_recver() { - let (sender, mut recver) = Builder::new().build(); - works(&sender, &recver); - - recver.disconnect(); - - assert!(sender.is_closed()); - assert!(recver.is_closed()); -} - -#[test] -fn close_sender() { - let (sender, recver) = Builder::new().build(); - works(&sender, &recver); - - assert_eq!(sender.try_send(2), Ok(())); - - sender.close_channel(); - - assert!(sender.is_closed()); - assert!(recver.is_closed()); - - assert_eq!(recver.try_recv(), Ok(Some(2))); - assert!(sender.try_send(1).unwrap_err().is_closed()); - assert!(recver.try_recv().unwrap_err().is_closed()); -} - -#[test] -fn close_recver() { - let (sender, recver) = Builder::new().build(); - works(&sender, &recver); - - assert_eq!(sender.try_send(2), Ok(())); - - recver.close_channel(); - - assert!(sender.is_closed()); - assert!(recver.is_closed()); - - assert_eq!(recver.try_recv(), Ok(Some(2))); - assert!(sender.try_send(1).unwrap_err().is_closed()); - assert!(recver.try_recv().unwrap_err().is_closed()); -} - -#[test] -fn drop_senders() { - let (sender, recver) = Builder::new().build(); - works(&sender, &recver); - - assert_eq!(sender.try_send(2), Ok(())); - - drop(sender); - - assert!(recver.is_closed()); - assert_eq!(recver.try_recv(), Ok(Some(2))); - assert!(recver.try_recv().unwrap_err().is_closed()); -} - -#[test] -fn drop_recvers() { - let (sender, recver) = Builder::new().build(); - works(&sender, &recver); - - drop(recver); - - assert!(sender.is_closed()); - assert!(sender.try_send(2).unwrap_err().is_closed()); -} - -#[test] -fn msgs_limit() { - let (sender, recver) = Builder::new().limited_msgs(3).build(); - works(&sender, &recver); - - assert_eq!(sender.try_send(2), Ok(())); - assert!(sender.try_send(3).unwrap_err().is_limit()); -} - -#[test] -fn senders_limit() { - let (sender, recver) = Builder::new().limited_senders(3).build(); - works(&sender, &recver); - - let _2 = sender.try_clone().unwrap(); - let _1 = sender.try_clone().unwrap(); - - assert!(sender.try_clone().is_err()); -} - -#[test] -fn recvers_limit() { - let (sender, recver) = Builder::new().limited_receivers(3).build(); - works(&sender, &recver); - - let _2 = recver.try_clone().unwrap(); - let _1 = recver.try_clone().unwrap(); - - assert!(recver.try_clone().is_err()); -} - -fn works(sender: &Sender, recver: &Receiver) { - let (sender1, recver1) = (sender.try_clone().unwrap(), recver.try_clone().unwrap()); - - let (sender2, recver2) = (sender1.try_clone().unwrap(), recver1.try_clone().unwrap()); - - assert_eq!(recver1.try_recv(), Ok(None)); - assert_eq!(recver2.try_recv(), Ok(None)); - - assert_eq!(sender1.try_send(0), Ok(())); - assert_eq!(sender2.try_send(1), Ok(())); - - assert_eq!(recver2.try_recv(), Ok(Some(0))); - assert_eq!(recver1.try_recv(), Ok(Some(1))); - - assert_eq!(recver1.try_recv(), Ok(None)); - assert_eq!(recver2.try_recv(), Ok(None)); -} - -async fn async_works<'c>(sender: &'c Sender, recver: &'c Receiver) { - let (sender1, mut recver1) = (sender.try_clone().unwrap(), recver.try_clone().unwrap()); - - let (sender2, mut recver2) = (sender1.try_clone().unwrap(), recver1.try_clone().unwrap()); - - let mut next1 = recver1.next(); - let mut next2 = recver2.next(); - - assert!(poll!(&mut next1).is_pending()); - assert!(poll!(&mut next2).is_pending()); - - assert_eq!(sender1.try_send(0), Ok(())); - assert_eq!(sender2.try_send(1), Ok(())); - - assert_eq!(next1.await, Some(0)); - assert_eq!(next2.await, Some(1)); - - assert_eq!(recver1.try_recv(), Ok(None)); - assert_eq!(recver2.try_recv(), Ok(None)); -} diff --git a/aktoro-context/Cargo.toml b/aktoro-context/Cargo.toml index 0bf5f5d..7cf1c20 100644 --- a/aktoro-context/Cargo.toml +++ b/aktoro-context/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" [dependencies] crossbeam-utils = "0.6" futures-core-preview = "0.3.0-alpha.17" +futures-io-preview = "0.3.0-alpha.17" futures-util-preview = "0.3.0-alpha.17" [dependencies.aktoro-channel] diff --git a/aktoro-context/src/channel.rs b/aktoro-context/src/channel.rs index e2b7ca6..bd6aa2b 100644 --- a/aktoro-context/src/channel.rs +++ b/aktoro-context/src/channel.rs @@ -1,5 +1,5 @@ use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_channel as channel; @@ -40,7 +40,7 @@ pub(crate) fn new() -> (Sender, Receiver) { impl raw::Sender for Sender where - A: raw::Actor, + A: raw::Actor + 'static, { type Receiver = Receiver; @@ -49,7 +49,7 @@ where fn try_send(&mut self, msg: M) -> raw::SenderRes where A: raw::Handler, - M: Send, + M: Send + 'static, { let (msg, recv) = Message::new(msg); @@ -67,7 +67,7 @@ where { type Item = Box>; - fn poll_next(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { Pin::new(&mut self.get_mut().0).poll_next(ctx) } } diff --git a/aktoro-context/src/context.rs b/aktoro-context/src/context.rs index 365f752..5fb882a 100644 --- a/aktoro-context/src/context.rs +++ b/aktoro-context/src/context.rs @@ -1,12 +1,18 @@ use std::collections::VecDeque; +use std::future::Future; use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_channel::error::TrySendError; +use aktoro_channel::Notify; use aktoro_raw as raw; use aktoro_raw::Updater as RawUpdater; +use aktoro_raw::Wait as RawWait; use futures_core::Stream; +use futures_io as io; +use futures_io::AsyncRead; +use futures_io::AsyncWrite; use crate::channel; use crate::channel::Receiver; @@ -15,14 +21,39 @@ use crate::control; use crate::control::Controlled; use crate::control::Controller; use crate::event::Event; +use crate::message::AsyncMessageFut; +use crate::message::AsyncMessageStream; +use crate::message::AsyncReadStream; +use crate::message::AsyncWriteFut; use crate::update; +use crate::update::Update; use crate::update::Updated; use crate::update::Updater; +/// The configuration that is used by [`Context`]. +/// +/// ## Note +/// +/// This is only used when spawning sub-actors and +/// shoudln't be used elsewhere. +pub struct ContextConfig { + /// Whether the context should wait to get + /// notified before starting to handle + /// messages, events, etc. + ready: Option, +} + /// An actor context using the [`aktoro-channel`] crate. /// /// [`aktoro-channel`]: https://docs.rs/aktoro-channel -pub struct Context { +pub struct Context { + /// The identifier of the actor, as used by + /// the runtime. + actor_id: u64, + /// Whether the context should wait to get + /// notified before starting to handle + /// messages, events, etc. + ready: Option, /// The actor's current status. status: A::Status, /// An actor's control channel sender. @@ -32,6 +63,27 @@ pub struct Context { /// Whether the status has been recently updated /// and the runtime should be notified. update: bool, + /// A list of futures that should be fully + /// executed before handling messages, events, + /// non-blocking futures, etc. + b_futs: Vec>>>, + /// A list of futures that the context should + /// give the output to the actor as a message. + futs: Vec>>>, + /// A list of streams that the context should + /// give the yielded items to the actor as + /// messages. + streams: Vec>>>, + /// A list of asynchronous readers that the + /// context should forward the data to the + /// actor as messages. + reads: Vec>>>, + /// An eventual inner runtime that the context + /// can use to run/spawn sub-actors. + rt: Option, + /// A list of contexts that should be notified + /// when all blocking futures have been handled. + to_notify: Vec, /// A list of the actor's unhandled events. events: VecDeque>>, /// An actor's message channel sender. @@ -44,15 +96,18 @@ pub struct Context { updted: Option>, } -impl raw::Context for Context +impl raw::Context for Context where - A: raw::Actor, + A: raw::Actor + 'static, + RT: raw::Runtime, { + type Config = ContextConfig; + type Controller = Controller; type Sender = Sender; type Updater = Updater; - fn new() -> Context { + fn new(actor_id: u64, config: ContextConfig) -> Context { // We create the actor's control, message and // update channels. let (ctrler, ctrled) = control::new(); @@ -60,10 +115,18 @@ where let (updter, updted) = update::new(); Context { + actor_id, + ready: config.ready, status: A::Status::default(), ctrler, ctrled, update: false, + b_futs: vec![], + futs: vec![], + streams: vec![], + reads: vec![], + rt: None, + to_notify: vec![], events: VecDeque::new(), sender, recver, @@ -72,6 +135,10 @@ where } } + fn actor_id(&self) -> u64 { + self.actor_id + } + fn emit(&mut self, event: E) where A: raw::EventHandler, @@ -91,9 +158,10 @@ where } } - fn update(&mut self) -> Result<(), TrySendError> { + fn update(&mut self) -> Result<(), TrySendError>> { self.update = false; - self.updter.try_send(self.status.clone()) + self.updter + .try_send(Update::new(self.actor_id, self.status.clone())) } fn controller(&self) -> &Controller { @@ -123,16 +191,176 @@ where fn updater(&mut self) -> &mut Updater { &mut self.updter } + + fn actors(&self) -> Vec { + if let Some(rt) = &self.rt { + rt.actors() + } else { + vec![] + } + } + + fn spawn(&mut self, actor: S) -> Option> + where + S: raw::Actor + 'static, + C: raw::Context, + { + let rt = if let Some(rt) = &mut self.rt { + rt + } else { + self.rt = Some(RT::default()); + self.rt.as_mut().unwrap() + }; + + let mut config = ContextConfig::default(); + + let (notify, ready) = Notify::new(); + config.ready = Some(ready); + + if let Some(spawned) = rt.spawn_with(actor, config) { + self.to_notify.push(notify); + Some(spawned) + } else { + None + } + } + + fn wait(&mut self, fut: Pin>, map: M) -> raw::Cancellable + where + F: Future + Unpin + Send + 'static, + M: Fn(O) -> T + Unpin + Send + Sync + 'static, + A: raw::Handler, + O: Send + 'static, + T: Send + 'static, + { + let (cancellable, inner) = raw::Cancellable::new(fut); + + self.futs.push(Box::pin(AsyncMessageFut::new(inner, map))); + + cancellable + } + + fn blocking_wait(&mut self, fut: Pin>, map: M) -> raw::Cancellable + where + F: Future + Unpin + Send + 'static, + M: Fn(O) -> T + Unpin + Send + Sync + 'static, + A: raw::Handler, + O: Send + 'static, + T: Send + 'static, + { + let (cancellable, inner) = raw::Cancellable::new(fut); + self.b_futs.push(Box::pin(AsyncMessageFut::new(inner, map))); + + cancellable + } + + fn subscribe(&mut self, stream: Pin>, map: M) -> raw::Cancellable + where + S: Stream + Unpin + Send + 'static, + M: Fn(I) -> T + Unpin + Send + Sync + 'static, + A: raw::Handler, + I: Send + 'static, + T: Send + 'static, + { + let (cancellable, inner) = raw::Cancellable::new(stream); + + self.streams + .push(Box::pin(AsyncMessageStream::new(inner, map))); + + cancellable + } + + fn read( + &mut self, + read: Pin>, + cap: usize, + map: M, + map_err: N, + ) -> raw::Cancellable + where + R: AsyncRead + Unpin + Send + 'static, + M: Fn(Vec) -> T + Unpin + Send + Sync + 'static, + N: Fn(io::Error) -> E + Unpin + Send + Sync + 'static, + A: raw::Handler + raw::Handler, + T: Send + 'static, + E: Send + 'static, + { + let (cancellable, inner) = raw::Cancellable::new(read); + + self.reads + .push(Box::pin(AsyncReadStream::new(inner, cap, map, map_err))); + + cancellable + } + + fn write( + &mut self, + write: Pin>, + data: Vec, + map: M, + map_err: N, + ) -> raw::Cancellable + where + W: AsyncWrite + Unpin + Send + 'static, + M: Fn((Vec, usize), Pin>) -> T + Unpin + Send + Sync + 'static, + N: Fn(io::Error) -> E + Unpin + Send + Sync + 'static, + A: raw::Handler + raw::Handler, + T: Send + 'static, + E: Send + 'static, + { + let (cancellable, inner) = raw::Cancellable::new(write); + + self.futs + .push(Box::pin(AsyncWriteFut::new(inner, data, map, map_err))); + + cancellable + } + + fn blocking_write( + &mut self, + write: Pin>, + data: Vec, + map: M, + map_err: N, + ) -> raw::Cancellable + where + W: AsyncWrite + Unpin + Send + 'static, + M: Fn((Vec, usize), Pin>) -> T + Unpin + Send + Sync + 'static, + N: Fn(io::Error) -> E + Unpin + Send + Sync + 'static, + A: raw::Handler + raw::Handler, + T: Send + 'static, + E: Send + 'static, + { + let (cancellable, inner) = raw::Cancellable::new(write); + + self.b_futs + .push(Box::pin(AsyncWriteFut::new(inner, data, map, map_err))); + + cancellable + } } -impl Stream for Context +impl Stream for Context where A: raw::Actor, + R: raw::Runtime, { type Item = raw::Work; - fn poll_next(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll>> { let context = self.get_mut(); + let mut ret = None; + + // If the context hasn't been marked as being + // ready yet, we try to see if it should be now. + if let Some(ready) = context.ready.as_mut() { + match Pin::new(ready).poll(ctx) { + Poll::Ready(()) => { + context.ready.take(); + } + Poll::Pending => return Poll::Pending, + } + } // If an action has been received an action has // been received from the actor's control channel, @@ -152,6 +380,45 @@ where return Poll::Ready(Some(raw::Work::Update)); } + // We try to poll all blocking futures until they + // all finished executing, making the actor handle + // the returned messages as they are yielded. + let mut to_remove = vec![]; + for (i, fut) in context.b_futs.iter_mut().enumerate() { + match fut.as_mut().poll(ctx) { + Poll::Ready(Some(msg)) => { + to_remove.push(i); + ret = Some(raw::Work::Message(msg)); + + break; + } + Poll::Ready(None) => to_remove.push(i), + Poll::Pending => (), + } + } + + // We remove the fully executed futures. + for to_remove in to_remove { + context.b_futs.remove(to_remove); + } + + // We eventually return the output of the first + // fully executed future... + if let Some(ret) = ret { + return Poll::Ready(Some(ret)); + // ...or we wait if all blocking futures havn't + // fully executed. + } else if !context.b_futs.is_empty() { + return Poll::Pending; + } + + // If there are no more blocking futures, we + // notify the context of all the recently + // spawned actors that they are ready. + for to_notify in context.to_notify.drain(..) { + to_notify.done(); + } + // If the actor has unhandled events, we ask the // runtime to make the actor handle the oldest // one. @@ -170,6 +437,115 @@ where Poll::Pending => (), } + // We poll the inner runtime if there is one. + if let Some(rt) = context.rt.take() { + let mut wait = rt.wait(); + + // We poll until there is either... + loop { + // ...no running actors or... + if wait.runtime().actors().is_empty() { + break; + } + + // ...the runtime is waiting for them + // to yield. + if let Poll::Pending = Pin::new(&mut wait).poll_next(ctx) { + break; + } + } + + context.rt = Some(wait.into_runtime()); + } + + // We poll all the futures that the context + // was asked to handle. + let mut to_remove = vec![]; + for (i, fut) in context.futs.iter_mut().enumerate() { + match fut.as_mut().poll(ctx) { + Poll::Ready(Some(msg)) => { + to_remove.push(i); + ret = Some(raw::Work::Message(msg)); + + break; + } + Poll::Ready(None) => to_remove.push(i), + Poll::Pending => (), + } + } + + // We remove the fully executed futures... + for to_remove in to_remove { + context.futs.remove(to_remove); + } + + // ...and return the output of the futures + // that returned one. + if let Some(ret) = ret { + return Poll::Ready(Some(ret)); + } + + // We poll all the streams that the context + // was asked to handle. + let mut to_remove = vec![]; + for (i, stream) in context.streams.iter_mut().enumerate() { + match stream.as_mut().poll_next(ctx) { + Poll::Ready(Some(msg)) => ret = Some(raw::Work::Message(msg)), + Poll::Ready(None) => to_remove.push(i), + Poll::Pending => (), + } + } + + // We remove all the closed streams... + for to_remove in to_remove { + context.streams.remove(to_remove); + } + + // ...and return the yielded items. + if let Some(ret) = ret { + return Poll::Ready(Some(ret)); + } + + // We poll all the asynchronous readers that + // the context was asked to handle. + let mut to_remove = vec![]; + for (i, read) in context.reads.iter_mut().enumerate() { + match read.as_mut().poll_read(ctx) { + Poll::Ready(Some(msg)) => ret = Some(raw::Work::Message(msg)), + Poll::Ready(None) => to_remove.push(i), + Poll::Pending => (), + } + } + + // We remove all the closed readers... + for to_remove in to_remove { + context.reads.remove(to_remove); + } + + // ...and transfer the data read to the + // actor. + if let Some(ret) = ret { + return Poll::Ready(Some(ret)); + } + Poll::Pending } } + +impl Default for ContextConfig { + fn default() -> Self { + ContextConfig { ready: None } + } +} + +impl Drop for Context +where + A: raw::Actor, + R: raw::Runtime, +{ + fn drop(&mut self) { + if let Some(rt) = &mut self.rt { + rt.stop(); + } + } +} diff --git a/aktoro-context/src/control.rs b/aktoro-context/src/control.rs index e1eddf7..d9f5a25 100644 --- a/aktoro-context/src/control.rs +++ b/aktoro-context/src/control.rs @@ -1,5 +1,5 @@ use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_channel as channel; @@ -40,7 +40,7 @@ pub(crate) fn new() -> (Controller, Controlled) { impl raw::Controller for Controller where - A: raw::Actor, + A: raw::Actor + 'static, { type Controlled = Controlled; @@ -69,7 +69,7 @@ where fn poll_next( self: Pin<&mut Self>, - ctx: &mut FutContext, + ctx: &mut task::Context, ) -> Poll>>> { Pin::new(&mut self.get_mut().0).poll_next(ctx) } diff --git a/aktoro-context/src/message.rs b/aktoro-context/src/message.rs index c553b81..bbc365a 100644 --- a/aktoro-context/src/message.rs +++ b/aktoro-context/src/message.rs @@ -1,4 +1,14 @@ +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task; +use std::task::Poll; + use aktoro_raw as raw; +use futures_core::Stream; +use futures_io as io; +use futures_io::AsyncRead; +use futures_io::AsyncWrite; use crate::respond::Respond; @@ -7,16 +17,84 @@ use crate::respond::Respond; pub(crate) struct Message where A: raw::Handler, - M: Send + 'static, + M: Send, { msg: Option, resp: Option>, } +pub(crate) struct AsyncMessage +where + A: raw::Handler, + M: Send, +{ + msg: Option, + _act: PhantomData, +} + +pub(crate) struct AsyncMessageFut +where + A: raw::Handler, + F: Future + Unpin + Send, + M: Fn(O) -> T + Send, + O: Send, + T: Send, +{ + inner: raw::CancellableInner, + map: M, + _act: PhantomData, +} + +pub(crate) struct AsyncMessageStream +where + A: raw::Handler, + S: Stream + Unpin + Send, + M: Fn(I) -> T + Send, + I: Send, + T: Send, +{ + inner: raw::CancellableInner, + map: M, + _act: PhantomData, +} + +pub(crate) struct AsyncReadStream +where + A: raw::Handler + raw::Handler, + R: AsyncRead + Unpin + Send, + M: Fn(Vec) -> T + Send, + N: Fn(io::Error) -> E + Send, + T: Send, + E: Send, +{ + cap: usize, + buf: Vec, + inner: raw::CancellableInner, + map: M, + map_err: N, + _act: PhantomData, +} + +pub(crate) struct AsyncWriteFut +where + A: raw::Handler + raw::Handler, + W: AsyncWrite + Unpin + Send, + M: Fn((Vec, usize), Pin>) -> T + Unpin + Send + Sync + 'static, + N: Fn(io::Error) -> E + Unpin + Send + Sync + 'static, + T: Send, + E: Send, +{ + data: Option>, + map: M, + map_err: N, + inner: raw::CancellableInner, + _act: PhantomData, +} + impl Message where A: raw::Handler, - M: Send + 'static, + M: Send, { pub(crate) fn new(msg: M) -> (Self, Respond) { let resp = Respond::new(); @@ -31,6 +109,94 @@ where } } +impl AsyncMessage +where + A: raw::Handler, + M: Send, +{ + pub(crate) fn new(msg: M) -> Self { + AsyncMessage { + msg: Some(msg), + _act: PhantomData, + } + } +} + +impl AsyncMessageFut +where + A: raw::Handler, + F: Future + Unpin + Send, + M: Fn(O) -> T + Unpin + Send, + O: Send, + T: Send, +{ + pub(crate) fn new(inner: raw::CancellableInner, map: M) -> Self { + AsyncMessageFut { + inner, + map, + _act: PhantomData, + } + } +} + +impl AsyncMessageStream +where + A: raw::Handler, + S: Stream + Unpin + Send, + M: Fn(I) -> T + Unpin + Send, + I: Send, + T: Send, +{ + pub(crate) fn new(inner: raw::CancellableInner, map: M) -> Self { + AsyncMessageStream { + inner, + map, + _act: PhantomData, + } + } +} + +impl AsyncReadStream +where + A: raw::Handler + raw::Handler, + R: AsyncRead + Unpin + Send, + M: Fn(Vec) -> T + Unpin + Send, + N: Fn(io::Error) -> E + Unpin + Send, + T: Send, + E: Send, +{ + pub(crate) fn new(inner: raw::CancellableInner, cap: usize, map: M, map_err: N) -> Self { + AsyncReadStream { + cap, + buf: vec![0; cap], + inner, + map, + map_err, + _act: PhantomData, + } + } +} + +impl AsyncWriteFut +where + A: raw::Handler + raw::Handler, + W: AsyncWrite + Unpin + Send, + M: Fn((Vec, usize), Pin>) -> T + Unpin + Send + Sync, + N: Fn(io::Error) -> E + Unpin + Send + Sync, + T: Send, + E: Send, +{ + pub(crate) fn new(inner: raw::CancellableInner, data: Vec, map: M, map_err: N) -> Self { + AsyncWriteFut { + data: Some(data), + map, + map_err, + inner, + _act: PhantomData, + } + } +} + impl raw::Message for Message where A: raw::Handler, @@ -48,3 +214,186 @@ where Ok(()) } } + +impl raw::Message for AsyncMessage +where + A: raw::Handler, + M: Send, +{ + type Actor = A; + + fn handle(&mut self, actor: &mut A, ctx: &mut A::Context) -> Result<(), A::Error> { + // If the message hasn't already been handled, + // we do so and return the result. + if let Some(msg) = self.msg.take() { + actor.handle(msg, ctx) + } else { + Ok(()) + } + } +} + +impl raw::AsyncMessageFut for AsyncMessageFut +where + A: raw::Handler + 'static, + F: Future + Unpin + Send, + M: Fn(O) -> T + Unpin + Send, + O: Send + 'static, + T: Send + 'static, +{ + type Actor = A; + + fn poll( + self: Pin<&mut Self>, + ctx: &mut task::Context, + ) -> Poll> { + let fut = self.get_mut(); + let mut inner = if let Some(inner) = fut.inner.get() { + inner + } else { + return Poll::Ready(None); + }; + + match Pin::new(&mut inner).poll(ctx) { + Poll::Ready(output) => { + let msg = (fut.map)(output); + + fut.inner.done(); + Poll::Ready(Some(Box::new(AsyncMessage::new(msg)))) + } + Poll::Pending => { + fut.inner.set(inner); + Poll::Pending + } + } + } +} + +impl raw::AsyncMessageStream for AsyncMessageStream +where + A: raw::Handler + 'static, + S: Stream + Unpin + Send, + M: Fn(I) -> T + Unpin + Send, + I: Send + 'static, + T: Send + 'static, +{ + type Actor = A; + + fn poll_next( + self: Pin<&mut Self>, + ctx: &mut task::Context, + ) -> Poll> { + let stream = self.get_mut(); + let mut inner = if let Some(inner) = stream.inner.get() { + inner + } else { + return Poll::Ready(None); + }; + + match Pin::new(&mut inner).poll_next(ctx) { + Poll::Ready(Some(item)) => { + let msg = (stream.map)(item); + + stream.inner.set(inner); + Poll::Ready(Some(Box::new(AsyncMessage::new(msg)))) + } + Poll::Ready(None) => { + stream.inner.done(); + Poll::Ready(None) + } + Poll::Pending => { + stream.inner.set(inner); + Poll::Pending + } + } + } +} + +impl raw::AsyncReadStream for AsyncReadStream +where + A: raw::Handler + raw::Handler + 'static, + R: AsyncRead + Unpin + Send, + M: Fn(Vec) -> T + Unpin + Send, + N: Fn(io::Error) -> E + Unpin + Send, + T: Send + 'static, + E: Send + 'static, +{ + type Actor = A; + + fn poll_read( + self: Pin<&mut Self>, + ctx: &mut task::Context, + ) -> Poll> { + let stream = self.get_mut(); + let mut inner = if let Some(inner) = stream.inner.get() { + inner + } else { + return Poll::Ready(None); + }; + + match Pin::new(&mut inner).poll_read(ctx, &mut stream.buf) { + Poll::Ready(Ok(read)) => { + let data = stream.buf.drain(0..read).collect(); + stream.buf.resize(stream.cap, 0); + + let msg = (stream.map)(data); + + stream.inner.set(inner); + Poll::Ready(Some(Box::new(AsyncMessage::new(msg)))) + } + Poll::Ready(Err(err)) => { + let msg = (stream.map_err)(err); + + stream.inner.set(inner); + Poll::Ready(Some(Box::new(AsyncMessage::new(msg)))) + } + Poll::Pending => { + stream.inner.set(inner); + Poll::Pending + } + } + } +} + +impl raw::AsyncMessageFut for AsyncWriteFut +where + A: raw::Handler + raw::Handler + 'static, + W: AsyncWrite + Unpin + Send, + M: Fn((Vec, usize), Pin>) -> T + Unpin + Send + Sync, + N: Fn(io::Error) -> E + Unpin + Send + Sync, + T: Send + 'static, + E: Send + 'static, +{ + type Actor = A; + + fn poll( + self: Pin<&mut Self>, + ctx: &mut task::Context, + ) -> Poll> { + let fut = self.get_mut(); + let mut inner = if let Some(inner) = fut.inner.get() { + inner + } else { + return Poll::Ready(None); + }; + + match Pin::new(&mut inner).poll_write(ctx, fut.data.as_ref().unwrap()) { + Poll::Ready(Ok(wrote)) => { + let msg = (fut.map)((fut.data.take().unwrap(), wrote), inner); + + fut.inner.done(); + Poll::Ready(Some(Box::new(AsyncMessage::new(msg)))) + } + Poll::Ready(Err(err)) => { + let msg = (fut.map_err)(err); + + fut.inner.done(); + Poll::Ready(Some(Box::new(AsyncMessage::new(msg)))) + } + Poll::Pending => { + fut.inner.set(inner); + Poll::Pending + } + } + } +} diff --git a/aktoro-context/src/respond.rs b/aktoro-context/src/respond.rs index bc32e1e..cdf956c 100644 --- a/aktoro-context/src/respond.rs +++ b/aktoro-context/src/respond.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use std::task::Waker; @@ -53,7 +53,7 @@ impl Respond { impl Future for Respond { type Output = O; - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll { + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll { // If the output is available, we // complete the future with it. if let Some(out) = self.0.out.swap(None) { diff --git a/aktoro-context/src/update.rs b/aktoro-context/src/update.rs index 284bed3..d6f2491 100644 --- a/aktoro-context/src/update.rs +++ b/aktoro-context/src/update.rs @@ -1,5 +1,5 @@ use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_channel as channel; @@ -7,17 +7,29 @@ use aktoro_channel::error::TrySendError; use aktoro_raw as raw; use futures_core::Stream; +/// A wrapper around an actor's status, +/// containing its identifier. +pub struct Update +where + A: raw::Actor, +{ + /// The actor's identifier. + actor_id: u64, + /// The actor's new status. + status: A::Status, +} + /// An actor's update channel sender, used by /// [`Context`]. /// /// [`Context`]: struct.Context.html -pub struct Updater(channel::Sender); +pub struct Updater(channel::Sender>); /// An actor's update channel receiver, used /// by [`Context`]. /// /// [`Context`]: struct.Context.html -pub struct Updated(channel::Receiver); +pub struct Updated(channel::Receiver>); /// Creates a new control channel for the /// specified actor type, returning a sender @@ -35,28 +47,145 @@ pub(crate) fn new() -> (Updater, Updated) { (Updater(sender), Updated(recver)) } -impl raw::Updater for Updater +impl Update +where + A: raw::Actor, +{ + pub(crate) fn new(actor_id: u64, status: A::Status) -> Self { + Update { actor_id, status } + } +} + +impl raw::Status for Update where A: raw::Actor, { + fn starting() -> Self { + Update { + actor_id: !0, + status: A::Status::starting(), + } + } + + fn started() -> Self { + Update { + actor_id: !0, + status: A::Status::started(), + } + } + + fn stopping() -> Self { + Update { + actor_id: !0, + status: A::Status::stopping(), + } + } + + fn stopped() -> Self { + Update { + actor_id: !0, + status: A::Status::stopped(), + } + } + + fn dead() -> Self { + Update { + actor_id: !0, + status: A::Status::dead(), + } + } + + fn is_starting(&self) -> bool { + self.status.is_starting() + } + + fn is_started(&self) -> bool { + self.status.is_started() + } + + fn is_stopping(&self) -> bool { + self.status.is_stopping() + } + + fn is_stopped(&self) -> bool { + self.status.is_stopped() + } + + fn is_dead(&self) -> bool { + self.status.is_dead() + } +} + +impl raw::Update for Update +where + A: raw::Actor, +{ + fn actor_id(&self) -> u64 { + self.actor_id + } + + fn set_actor_id(&mut self, id: u64) { + self.actor_id = id; + } +} + +impl raw::Updater for Updater +where + A: raw::Actor + 'static, +{ + type Update = Update; + type Updated = Updated; - type Error = TrySendError; + type Error = TrySendError>; - fn try_send(&mut self, status: A::Status) -> Result<(), Self::Error> { - self.0.try_send(status) + fn try_send(&mut self, update: Update) -> Result<(), Self::Error> { + self.0.try_send(update) } } -impl raw::Updated for Updated {} +impl raw::Updated> for Updated {} impl Stream for Updated where A: raw::Actor, { - type Item = A::Status; + type Item = Update; - fn poll_next(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll>> { Pin::new(&mut self.get_mut().0).poll_next(ctx) } } + +impl PartialEq for Update +where + A: raw::Actor, +{ + fn eq(&self, other: &Self) -> bool { + self.actor_id == other.actor_id && self.status == other.status + } +} + +impl Default for Update +where + A: raw::Actor, +{ + fn default() -> Self { + Update { + actor_id: !0, + status: A::Status::default(), + } + } +} + +impl Clone for Update +where + A: raw::Actor, +{ + fn clone(&self) -> Self { + Update { + actor_id: self.actor_id, + status: self.status.clone(), + } + } +} diff --git a/aktoro-raw/Cargo.toml b/aktoro-raw/Cargo.toml index 41a6854..8c4fbb8 100644 --- a/aktoro-raw/Cargo.toml +++ b/aktoro-raw/Cargo.toml @@ -7,5 +7,6 @@ authors = ["Matthieu Le Brazidec "] edition = "2018" [dependencies] +crossbeam-utils = "0.6" futures-core-preview = "0.3.0-alpha.17" futures-io-preview = "0.3.0-alpha.17" diff --git a/aktoro-raw/src/action.rs b/aktoro-raw/src/action.rs index 23c69fb..c2ff275 100644 --- a/aktoro-raw/src/action.rs +++ b/aktoro-raw/src/action.rs @@ -10,7 +10,7 @@ pub trait Action: Send { ) -> Result<(), ::Error>; } -pub trait ActionHandler: Actor { +pub trait ActionHandler: Actor { type Output: Send; /// Handles the action, returning a result diff --git a/aktoro-raw/src/actor.rs b/aktoro-raw/src/actor.rs index 6f6a341..a2d7004 100644 --- a/aktoro-raw/src/actor.rs +++ b/aktoro-raw/src/actor.rs @@ -1,13 +1,13 @@ -use std::error::Error as StdError; +use std::error; use crate::context::Context; -pub trait Actor: Unpin + Send + Sized + 'static { +pub trait Actor: Unpin + Send + Sized { type Context: Context; type Status: Status + Unpin; - type Error: StdError + Send; + type Error: error::Error + Send + 'static; #[allow(unused)] /// Called when the actor's context has been created @@ -30,7 +30,7 @@ pub trait Actor: Unpin + Send + Sized + 'static { fn stopped(&mut self, ctx: &mut Self::Context) {} } -pub trait Status: PartialEq + Default + Clone + Send { +pub trait Status: PartialEq + Default + Clone + Unpin + Send { /// Returns the status that an actor should have /// before [`Actor::starting`] is called. /// diff --git a/aktoro-raw/src/channel.rs b/aktoro-raw/src/channel.rs index 5af4365..f2f6f62 100644 --- a/aktoro-raw/src/channel.rs +++ b/aktoro-raw/src/channel.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::error; use futures_core::future::BoxFuture; use futures_core::Stream; @@ -16,17 +16,17 @@ use crate::message::Message; /// [`Sender::try_send`]: trait.Sender.html#method.try_send pub type SenderRes<'s, O, E> = Result>, E>; -pub trait Sender: Clone { +pub trait Sender: Unpin + Clone + Send { type Receiver: Receiver; - type Error: StdError + Send; + type Error: error::Error + Send + 'static; /// Tries to send a message to be handled by the /// actor. fn try_send(&mut self, msg: M) -> SenderRes where A: Handler, - M: Send; + M: Send + 'static; } -pub trait Receiver: Stream>> {} +pub trait Receiver: Stream>> + Unpin + Send {} diff --git a/aktoro-raw/src/context.rs b/aktoro-raw/src/context.rs index 081d3dd..2309937 100644 --- a/aktoro-raw/src/context.rs +++ b/aktoro-raw/src/context.rs @@ -1,4 +1,17 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::task; +use std::task::Poll; +use std::task::Waker; + +use crossbeam_utils::atomic::AtomicCell; use futures_core::Stream; +use futures_io as io; +use futures_io::AsyncRead; +use futures_io::AsyncWrite; use crate::action::Action; use crate::actor::Actor; @@ -6,16 +19,72 @@ use crate::channel::Sender; use crate::control::Controller; use crate::event::Event; use crate::event::EventHandler; +use crate::message::Handler; use crate::message::Message; +use crate::spawned::Spawned; use crate::update::Updater; -pub trait Context: Unpin + Send + 'static + Stream> { +/// A wrapper around a future/stream/reader +/// that is returned by a context after asking +/// it to wait/subscribe/read, to allow to +/// cancel the action. +pub struct Cancellable(CancellableInner); + +/// The structure that is actually holding +/// what [`Cancellable`] needs. It is also +/// what contexts will have to store and +/// update. +/// +/// [`Cancellable`]: struct.Cancellable.html +pub struct CancellableInner { + /// The future/stream/reader that + /// is holded. + inner: Arc>>>>, + /// Whether the action(s) are done or + /// what is holded can be released. + done: Arc, + /// A reference to the space that + /// was assigned to store the + /// [`Cancelling`]'s waker, to wake it + /// up when needed. + /// + /// [`Cancelling`]: struct.Cancelling.html + waker: Arc>>, +} + +/// A future returned by [`Cancellable::cancel`] +/// that resolves when what is handled has +/// been cancelled (in which case it returns +/// it) or the action(s) are done. +pub struct Cancelling { + /// The future/stream/reader that is + /// holded. + inner: Arc>>>>, + /// Whether the action(s) are done or + /// what is holded can be released. + done: Arc, + /// A reference to the space that + /// was assigned to store the + /// [`Cancelling`]'s waker, to wake it + /// up when needed. + /// + /// [`Cancelling`]: struct.Cancelling.html + waker: Arc>>, +} + +pub trait Context: Stream> + Unpin + Send + Sized { + type Config: Default; + type Controller: Controller; type Sender: Sender; type Updater: Updater; - /// Creates a new context for an actor. - fn new() -> Self; + /// Creates a new context with the provided + /// config and an identifier for the actor. + fn new(actor_id: u64, config: Self::Config) -> Self; + + /// Returns the actor's identifier. + fn actor_id(&self) -> u64; /// Emits an event that will be handled by the /// actor. @@ -61,6 +130,158 @@ pub trait Context: Unpin + Send + 'static + Stream> { /// Gets a mutable reference to the actors's /// update channel receiver. fn updater(&mut self) -> &mut Self::Updater; + + /// Returns a list of the context's inner + /// runtime's actors' identifier. + fn actors(&self) -> Vec; + + /// Spawns a sub-actor on the context's inner + /// runtime. + /// + /// ## Note + /// + /// The new actor must have a context with the + /// same configuration structure as this context. + fn spawn(&mut self, actor: S) -> Option> + where + S: Actor + 'static, + C: Context; + + /// Waits for a future to yield before mapping it + /// to a message and passing it to the actor. + /// + /// The execution can be cancelled using the + /// returned [`Cancellable`]. Cancelling the + /// execution, if it isn't done, will return + /// the original `fut`. + /// + /// [`Cancellable`]: struct.Cancellable.html + fn wait(&mut self, fut: Pin>, map: M) -> Cancellable + where + F: Future + Unpin + Send + 'static, + M: Fn(O) -> T + Unpin + Send + Sync + 'static, + A: Handler, + O: Send + 'static, + T: Send + 'static; + + /// Waits for a future to yield before mapping it + /// to a message and passing it to the actor. + /// + /// Until all the blocking futures/asynchronous writes + /// have yielded, no messages, events, streams, etc. + /// will be handled by the context. + /// + /// The execution can be cancelled using the + /// returned [`Cancellable`]. Cancelling the + /// execution, if it isn't done, will return + /// the original `fut`. + /// + /// [`Cancellable`]: struct.Cancellable.html + fn blocking_wait(&mut self, fut: Pin>, map: M) -> Cancellable + where + F: Future + Unpin + Send + 'static, + M: Fn(O) -> T + Unpin + Send + Sync + 'static, + A: Handler, + O: Send + 'static, + T: Send + 'static; + + /// Forwards the items yielded by a stream to + /// the actor after mapping them to a message. + /// + /// The execution can be cancelled using the + /// returned [`Cancellable`]. Cancelling the + /// execution, if it isn't done, will return + /// the original `fut`. + /// + /// [`Cancellable`]: struct.Cancellable.html + fn subscribe(&mut self, stream: Pin>, map: M) -> Cancellable + where + S: Stream + Unpin + Send + 'static, + M: Fn(I) -> T + Unpin + Send + Sync + 'static, + A: Handler, + I: Send + 'static, + T: Send + 'static; + + /// Forwards the received data to the actor + /// after either mapping it or a returned + /// error to a message. + /// + /// The execution can be cancelled using the + /// returned [`Cancellable`]. Cancelling the + /// execution, if it isn't done, will return + /// the original `fut`. + /// + /// [`Cancellable`]: struct.Cancellable.html + fn read( + &mut self, + read: Pin>, + cap: usize, + map: M, + map_err: N, + ) -> Cancellable + where + R: AsyncRead + Unpin + Send + 'static, + M: Fn(Vec) -> T + Unpin + Send + Sync + 'static, + N: Fn(io::Error) -> E + Unpin + Send + Sync + 'static, + A: Handler + Handler, + T: Send + 'static, + E: Send + 'static; + + /// Waits for data to be written over an asynchronous + /// writer, then passing a message returned by either + /// `map` or `map_err` (depending on whether an error + /// was returned by the writer) to the actor. + /// + /// The execution can be cancelled using the + /// returned [`Cancellable`]. Cancelling the + /// execution, if it isn't done, will return + /// the original `fut`. + /// + /// [`Cancellable`]: struct.Cancellable.html + fn write( + &mut self, + write: Pin>, + data: Vec, + map: M, + map_err: N, + ) -> Cancellable + where + W: AsyncWrite + Unpin + Send + 'static, + M: Fn((Vec, usize), Pin>) -> T + Unpin + Send + Sync + 'static, + N: Fn(io::Error) -> E + Unpin + Send + Sync + 'static, + A: Handler + Handler, + T: Send + 'static, + E: Send + 'static; + + /// Waits for data to be written over an asynchronous + /// writer, then passing a message returned by either + /// `map` or `map_err` (depending on whether an error + /// was returned by the writer) to the actor. + /// + /// Until all the blocking futures/asynchronous writes + /// have yielded, no messages, events, streams, etc. + /// will be handled by the context. + /// + /// The execution can be cancelled using the + /// returned [`Cancellable`]. Cancelling the + /// execution, if it isn't done, will return + /// the original `fut`. + /// + /// [`Cancellable`]: struct.Cancellable.html + fn blocking_write( + &mut self, + write: Pin>, + data: Vec, + map: M, + map_err: N, + ) -> Cancellable + where + W: AsyncWrite + Unpin + Send + 'static, + M: Fn((Vec, usize), Pin>) -> T + Unpin + Send + Sync + 'static, + N: Fn(io::Error) -> E + Unpin + Send + Sync + 'static, + A: Handler + Handler, + T: Send + 'static, + E: Send + 'static; } pub enum Work { @@ -80,3 +301,94 @@ pub enum Work { /// changed. Update, } + +impl Cancellable { + pub fn new(inner: Pin>) -> (Self, CancellableInner) { + let inner = Arc::new(AtomicCell::new(Some(inner))); + let done = Arc::new(AtomicBool::new(false)); + let waker = Arc::new(AtomicCell::new(None)); + + ( + Cancellable(CancellableInner { + inner: inner.clone(), + done: done.clone(), + waker: waker.clone(), + }), + CancellableInner { inner, done, waker }, + ) + } + + /// Creates a future that will yield when + /// the action(s) have either been cancelled, + /// in which case it will also give back what + /// was holded, or are done. + pub fn cancel(self) -> Cancelling { + Cancelling { + inner: self.0.inner, + done: self.0.done, + waker: self.0.waker, + } + } +} + +impl CancellableInner { + /// Gets what the wrapper holds. + /// + /// ## Note + /// + /// If the action(s) aren't done, you need + /// to call [`set`]. + /// + /// [`set`]: #method.set + pub fn get(&self) -> Option>> { + self.inner.swap(None) + } + + /// Sets what the wrapper holds to `inner`. + pub fn set(&self, inner: Pin>) { + self.inner.store(Some(inner)); + + // We eventually wake up the future + // that wants to get what's holded back. + if let Some(waker) = self.waker.swap(None) { + waker.wake(); + } + } + + /// Sets the action(s) as done. + pub fn done(&self) { + self.inner.store(None); + self.done.store(true, Ordering::SeqCst); + + // We eventually wake up the future + // that wanted to get what's holded back, + // so that it can know that it wont be + // able to. + if let Some(waker) = self.waker.swap(None) { + waker.wake(); + } + } +} + +impl Future for Cancelling { + type Output = Option>>; + + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll>>> { + if self.done.load(Ordering::SeqCst) { + return Poll::Ready(None); + } + + match self.inner.swap(None) { + Some(inner) => { + self.waker.store(None); + + Poll::Ready(Some(inner)) + } + None => { + self.waker.store(Some(ctx.waker().clone())); + + Poll::Pending + } + } + } +} diff --git a/aktoro-raw/src/control.rs b/aktoro-raw/src/control.rs index f8f6d23..c2c05fd 100644 --- a/aktoro-raw/src/control.rs +++ b/aktoro-raw/src/control.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::error; use futures_core::future::BoxFuture; use futures_core::Stream; @@ -16,10 +16,10 @@ use crate::actor::Actor; /// [`Controller::try_send`]: trait.Controller.html#method.try_send pub type ControllerRes<'c, O, E> = Result>, E>; -pub trait Controller: Clone { +pub trait Controller: Unpin + Clone + Send { type Controlled: Controlled; - type Error: StdError + Send; + type Error: error::Error + Send + 'static; /// Tries to send an action to be handled by the /// actor. @@ -29,4 +29,4 @@ pub trait Controller: Clone { D: Send + 'static; } -pub trait Controlled: Stream>> {} +pub trait Controlled: Stream>> + Unpin + Send {} diff --git a/aktoro-raw/src/event.rs b/aktoro-raw/src/event.rs index f473ff8..cc4aa1b 100644 --- a/aktoro-raw/src/event.rs +++ b/aktoro-raw/src/event.rs @@ -10,7 +10,7 @@ pub trait Event: Send { ) -> Result<(), ::Error>; } -pub trait EventHandler: Actor { +pub trait EventHandler: Actor { /// Handles the event. fn handle(&mut self, event: E, ctx: &mut Self::Context) -> Result<(), Self::Error>; } diff --git a/aktoro-raw/src/lib.rs b/aktoro-raw/src/lib.rs index 4e86814..ee6b6f4 100644 --- a/aktoro-raw/src/lib.rs +++ b/aktoro-raw/src/lib.rs @@ -5,6 +5,7 @@ mod context; mod control; mod event; mod message; +mod net; mod runtime; mod spawned; mod tcp; @@ -18,10 +19,9 @@ pub use crate::context::*; pub use crate::control::*; pub use crate::event::*; pub use crate::message::*; +pub use crate::net::*; pub use crate::runtime::*; pub use crate::spawned::*; pub use crate::tcp::*; pub use crate::udp::*; pub use crate::update::*; - -pub use futures_core::future::BoxFuture; diff --git a/aktoro-raw/src/message.rs b/aktoro-raw/src/message.rs index 013ff5c..99fc834 100644 --- a/aktoro-raw/src/message.rs +++ b/aktoro-raw/src/message.rs @@ -1,5 +1,11 @@ +use std::pin::Pin; +use std::task; +use std::task::Poll; + use crate::actor::Actor; +pub type AsyncMessageRet = Option>>; + pub trait Message: Send { type Actor: Actor; @@ -10,10 +16,42 @@ pub trait Message: Send { ) -> Result<(), ::Error>; } -pub trait Handler: Actor { +pub trait AsyncMessageFut: Send { + type Actor: Actor; + + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll>; +} + +pub trait AsyncMessageStream: Send { + type Actor: Actor; + + fn poll_next( + self: Pin<&mut Self>, + ctx: &mut task::Context, + ) -> Poll>; +} + +pub trait AsyncReadStream: Send { + type Actor: Actor; + + fn poll_read( + self: Pin<&mut Self>, + ctx: &mut task::Context, + ) -> Poll>; +} + +pub trait Handler: Actor { type Output: Send; /// Handles the message, returning a result /// eventually containing the message's output. fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Result; } + +impl Handler<()> for A { + type Output = (); + + fn handle(&mut self, _: (), _: &mut Self::Context) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/aktoro-raw/src/net.rs b/aktoro-raw/src/net.rs new file mode 100644 index 0000000..89eed4a --- /dev/null +++ b/aktoro-raw/src/net.rs @@ -0,0 +1,51 @@ +use std::net::ToSocketAddrs; + +use crate::tcp::TcpClient; +use crate::tcp::TcpServer; +use crate::udp::UdpSocket; + +type TcpConnectRes = Result<::Connect, ::Error>; + +type TcpBindRes = Result::Error>; + +type UdpBindRes = Result::Error>; + +pub trait NetworkManager: Unpin + Send { + /// The type of the TCP socket client + /// that actors can use to be compatible + /// with the runtime (this might not be + /// necessary depending on the runtime + /// implementation). + type TcpClient: TcpClient; + + /// The type of the TCP socket server + /// that actors can use to be compatible + /// with the runtime (this might not be + /// necessary depending on the runtime + /// implementation). + type TcpServer: TcpServer; + + /// The type of UDP socket that actors + /// can use to be compatible with the + /// runtime (this might not be necessary + /// depending on the runtime implementation). + type UdpSocket: UdpSocket; + + /// Tries to connect to a TCP server at the + /// given address. + fn tcp_connect(&self, addr: A) -> TcpConnectRes { + Self::TcpClient::connect(addr) + } + + /// Tries to create a new TCP server that + /// will be bound to the given address. + fn tcp_bind(&self, addr: A) -> TcpBindRes { + Self::TcpServer::bind(addr) + } + + /// Tries to create a new UDP socket that + /// will be bound to the given address. + fn udp_bind(&self, addr: A) -> UdpBindRes { + Self::UdpSocket::bind(addr) + } +} diff --git a/aktoro-raw/src/runtime.rs b/aktoro-raw/src/runtime.rs index 4af1d2a..281183c 100644 --- a/aktoro-raw/src/runtime.rs +++ b/aktoro-raw/src/runtime.rs @@ -1,48 +1,41 @@ -use std::error::Error as StdError; -use std::future::Future; +use std::error; + +use futures_core::Stream; use crate::actor::Actor; +use crate::context::Context; +use crate::net::NetworkManager; use crate::spawned::Spawned; -use crate::tcp::TcpClient; -use crate::tcp::TcpServer; -use crate::udp::UdpSocket; - -pub trait Runtime { - /// The type of the TCP socket client - /// that actors can use to be compatible - /// with the runtime (this might not be - /// necessary depending on the runtime - /// implementation). - type TcpClient: TcpClient; - /// The type of the TCP socket server - /// that actors can use to be compatible - /// with the runtime (this might not be - /// necessary depending on the runtime - /// implementation). - type TcpServer: TcpServer; +pub trait Wait: Stream> + Unpin + Send { + /// Returns a reference to the runtime. + fn runtime(&self) -> &R; - /// The type of UDP socket that actors - /// can use to be compatible with the - /// runtime (this might not be necessary - /// depending on the runtime implementation). - type UdpSocket: UdpSocket; + /// Returns the runtime, consuming the + /// stream. + fn into_runtime(self) -> R; +} - /// The future returned after calling the - /// [`stop`] method. It will resolve after - /// all the actors have been stopped. - /// - /// [`stop`]: #method.stop - type Stop: Future>; +pub trait Runtime: Default + Unpin + Send { + /// The type that is handling the types of + /// the TCP socket client and server and + /// of the UDP socket that actors can use + /// to be compatible with the runtime (this + /// might not be necessary depending on the + /// runtime implementation). + type NetworkManager: NetworkManager; - /// The future returned after calling the - /// [`wait`] method. It will resolve after - /// all the actors have been stopped. + /// The type that is allowing the runtime to + /// be polled after calling [`wait`]. /// /// [`wait`]: #method.wait - type Wait: Future>; + type Wait: Wait; - type Error: StdError; + type Error: error::Error + Send + 'static; + + /// Returns a list of the runtime's actors' + /// identifier. + fn actors(&self) -> Vec; /// Spawns a new actor on the runtime, /// returning [`Some(Spawned)`] if it @@ -52,15 +45,39 @@ pub trait Runtime { /// /// [`Some(Spawned)`]: sturct.Spawned.html /// [`Actor::starting`]: trait.Actor.html#method.starting - fn spawn(&mut self, actor: A) -> Option>; + fn spawn(&mut self, actor: A) -> Option> + where + A: Actor + 'static; + + /// Spawns a new actor on the runtime, + /// passing its context the provided config + /// and returning [`Some(Spawned)`] if it + /// succeeded or [`None`] if it failed or + /// if the actor stopped itself when + /// [`Actor::starting`] was called. + /// + /// [`Some(Spawned)`]: sturct.Spawned.html + /// [`Actor::starting`]: trait.Actor.html#method.starting + fn spawn_with(&mut self, actor: A, config: C::Config) -> Option> + where + A: Actor + 'static, + C: Context; - /// Asks to all the actors managed by the - /// runtime to stop, returning a future - /// resolving after all of them have been - /// stopped. - fn stop(self) -> Self::Stop; + /// Creates a new network manager, that + /// can then be used by an actor to + /// create a new TCP client, server or + /// an UDP socket. + fn net(&mut self) -> Self::NetworkManager; - /// Waits for all the actors to be stopped, - /// returning a future waiting for it. + /// Returns a stream allowing to poll the + /// runtime's actors. + /// + /// ## Note + /// + /// The stream can be transformed back into + /// a runtime. fn wait(self) -> Self::Wait; + + /// Asks all the runtime's actors to stop. + fn stop(&mut self); } diff --git a/aktoro-raw/src/spawned.rs b/aktoro-raw/src/spawned.rs index 4bd8051..012af14 100644 --- a/aktoro-raw/src/spawned.rs +++ b/aktoro-raw/src/spawned.rs @@ -1,5 +1,5 @@ use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use futures_core::Stream; @@ -20,21 +20,28 @@ type SenderError = as RawSender>::Error; type Controller = <::Context as Context>::Controller; type ControllerError = as RawController>::Error; +type Update = <<::Context as Context>::Updater as Updater>::Update; type Updated = <<::Context as Context>::Updater as Updater>::Updated; /// A wrapper around an actor's /// message, control and update /// channels. pub struct Spawned { + /// The actor's message channel's + /// sender. sender: Sender, + /// The actor's control channel's + /// sender. ctrler: Controller, + /// The actor's update channel's + /// receiver. updted: Option>, } impl Spawned { /// Creates a new `Spawned` struct from an actor's /// context. - pub fn new(ctx: &mut A::Context) -> Spawned { + pub fn new(ctx: &mut A::Context) -> Self { Spawned { sender: ctx.sender().clone(), ctrler: ctx.controller().clone(), @@ -52,7 +59,7 @@ impl Spawned { pub fn try_send_msg(&mut self, msg: M) -> SenderRes> where A: Handler, - M: Send, + M: Send + 'static, { self.sender.try_send(msg) } @@ -96,14 +103,10 @@ impl Spawned { impl Unpin for Spawned {} -impl Stream for Spawned -where - A: Actor, - Updated: Unpin, -{ - type Item = A::Status; +impl Stream for Spawned { + type Item = Update; - fn poll_next(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll>> { if let Some(updted) = &mut self.get_mut().updted { Pin::new(updted).poll_next(ctx) } else { @@ -112,10 +115,7 @@ where } } -impl Clone for Spawned -where - A: Actor, -{ +impl Clone for Spawned { fn clone(&self) -> Self { Spawned { sender: self.sender.clone(), diff --git a/aktoro-raw/src/tcp.rs b/aktoro-raw/src/tcp.rs index d9fd955..6cae86a 100644 --- a/aktoro-raw/src/tcp.rs +++ b/aktoro-raw/src/tcp.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::error; use std::future::Future; use std::net::SocketAddr; use std::net::ToSocketAddrs; @@ -7,22 +7,30 @@ use futures_core::Stream; use futures_io::AsyncRead; use futures_io::AsyncWrite; -pub type TcpServerIncoming<'s, S, E> = Box> + 's>; +pub type TcpServerIncoming<'s, S> = Box< + dyn Stream::Stream, ::Error>> + + Unpin + + Send + + 's, +>; +pub type OwnedTcpServerIncoming = Box< + dyn Stream::Stream, ::Error>> + Unpin + Send, +>; -pub trait TcpClient: TcpStream + Sized { - type Connect: Future::Error>>; +pub trait TcpClient: TcpStream + Unpin + Send + Sized { + type Connect: Future::Error>> + Unpin + Send; - type Error: StdError; + type Error: error::Error + Send + 'static; /// Tries to connect to a TCP server at the /// given address. fn connect(addr: A) -> Result::Error>; } -pub trait TcpServer: Sized { +pub trait TcpServer: Unpin + Send + Sized { type Stream: TcpStream; - type Error: StdError; + type Error: error::Error + Send + 'static; /// Tries to create a new TCP server that /// will be bound to the given address. @@ -33,11 +41,15 @@ pub trait TcpServer: Sized { fn local_addr(&self) -> Result; /// Returns a stream of incoming connections. - fn incoming(&mut self) -> Result, Self::Error>; + fn incoming(&mut self) -> Result, Self::Error>; + + /// Returns a stream of incoming connections, + /// consuming the server. + fn into_incoming(self) -> Result, Self::Error>; } -pub trait TcpStream: AsyncRead + AsyncWrite { - type Error: StdError; +pub trait TcpStream: AsyncRead + AsyncWrite + Unpin + Send { + type Error: error::Error + Send + 'static; /// Returns the address that the server /// is bound to. diff --git a/aktoro-raw/src/udp.rs b/aktoro-raw/src/udp.rs index 8e14ac3..8063d0c 100644 --- a/aktoro-raw/src/udp.rs +++ b/aktoro-raw/src/udp.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::error; use std::net::SocketAddr; use std::net::ToSocketAddrs; @@ -8,8 +8,8 @@ pub type UdpSocketSendTo<'s, E> = Box> + 's pub type UdpSocketRecv<'s, E> = Box> + 's>; -pub trait UdpSocket: Sized { - type Error: StdError; +pub trait UdpSocket: Unpin + Send + Sized { + type Error: error::Error + Send + 'static; /// Tries to create a new UDP socket that /// will be bound to the given address. diff --git a/aktoro-raw/src/update.rs b/aktoro-raw/src/update.rs index 66d0ccc..758dd96 100644 --- a/aktoro-raw/src/update.rs +++ b/aktoro-raw/src/update.rs @@ -1,17 +1,27 @@ -use std::error::Error as StdError; +use std::error; use futures_core::Stream; use crate::actor::Actor; +use crate::actor::Status; -pub trait Updater { - type Updated: Updated; +pub trait Update: Status + Unpin + Send { + fn actor_id(&self) -> u64; - type Error: StdError + Send; + fn set_actor_id(&mut self, id: u64); +} + +pub trait Updater: Unpin + Send { + type Update: Update; + + type Updated: Updated; + + type Error: error::Error + Send + 'static; - /// Tries to send a status update over - /// the actor's update channel. - fn try_send(&mut self, status: A::Status) -> Result<(), Self::Error>; + /// Tries to send an update to be handled by + /// whatever is holding the update channel's + /// receiver. + fn try_send(&mut self, update: Self::Update) -> Result<(), Self::Error>; } -pub trait Updated: Stream {} +pub trait Updated: Stream + Unpin + Send {} diff --git a/aktoro-runtime/src/actor.rs b/aktoro-runtime/src/actor.rs index 1e199a8..ae72043 100644 --- a/aktoro-runtime/src/actor.rs +++ b/aktoro-runtime/src/actor.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_channel as channel; @@ -26,7 +26,7 @@ pub(crate) struct Actor { killed: KilledSender, } -#[derive(Eq, PartialEq, Debug, Clone)] +#[derive(PartialEq, Debug, Clone)] /// A default implementation for the /// [`aktoro-raw::Status`] trait. /// @@ -139,19 +139,22 @@ impl Actor { // dead. self.ctx.set_status(A::Status::dead()); - // We try to push the actor's - // new status over its update - // channel. - if let Err(err) = self.ctx.update() { - return Err(Box::new(err).into()); - } - // We try to notify the actor's // death over the killed channel. if let Err(err) = self.killed.killed(self.id) { return Err(Box::new(err).into()); } + // We try to push the actor's + // new status over its update + // channel. + // NOTE: this is done after sending + // the death notification because + // if the `Spanwed` linked to + // this actor has been dropped, + // it will return an error. + // TODO: should we take care of the possible errors? + self.ctx.update().ok(); Ok(()) } } @@ -216,10 +219,13 @@ impl KilledSender { } } -impl Future for Actor { +impl Future for Actor +where + A: raw::Actor + 'static, +{ type Output = Result<(), Error>; - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll { + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll { let actor = self.get_mut(); loop { @@ -324,7 +330,7 @@ impl Future for Actor { impl Future for KillRecver { type Output = (); - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll<()> { + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll<()> { let recver = self.get_mut(); if let Some(notify) = &mut recver.0 { @@ -342,7 +348,7 @@ impl Future for KillRecver { impl Stream for KilledRecver { type Item = u64; - fn poll_next(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { Pin::new(&mut self.get_mut().0).poll_next(ctx) } } diff --git a/aktoro-runtime/src/error.rs b/aktoro-runtime/src/error.rs index f0fb8a6..bed48ec 100644 --- a/aktoro-runtime/src/error.rs +++ b/aktoro-runtime/src/error.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use std::error; use std::fmt; use std::fmt::Display; use std::fmt::Formatter; @@ -10,30 +10,20 @@ pub struct Error { #[derive(Debug)] pub enum ErrorKind { - /// The actor has already been removed from - /// the runtime's actors list. - AlreadyRemoved(u64), /// Returns a `Box` contaning any error type /// that implements the [`Error`] trait. /// /// [`Error`]: https://doc.rust-lang.org/std/error/trait.Error.html - Std(Box), + Std(Box), /// Multiple errors occured. Multiple(Vec), } impl Error { - /// Creates a new "already removed" error. - pub(crate) fn already_removed(id: u64) -> Self { - Error { - kind: ErrorKind::AlreadyRemoved(id), - } - } - /// Creates a new boxed error. pub(crate) fn std(err: S) -> Self where - S: StdError + Send + 'static, + S: error::Error + Send + 'static, { Error { kind: ErrorKind::Std(Box::new(err)), @@ -88,17 +78,6 @@ impl Error { } } - /// Whether the error occured because the actor - /// was already removed from the runtime's actors - /// list. - pub fn is_already_removed(&self) -> bool { - if let ErrorKind::AlreadyRemoved(_) = self.kind { - true - } else { - false - } - } - /// Whether multiple errors occured. pub fn is_multiple(&self) -> bool { if let ErrorKind::Multiple(_) = self.kind { @@ -120,14 +99,11 @@ impl Error { } } -impl StdError for Error {} +impl error::Error for Error {} impl Display for Error { fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { match &self.kind { - ErrorKind::AlreadyRemoved(id) => { - write!(fmt, "actor ({}) already removed from list", id,) - } ErrorKind::Std(err) => write!(fmt, "{}", err), ErrorKind::Multiple(_) => write!(fmt, "multiple errors",), } @@ -142,7 +118,7 @@ impl From for Error { impl From> for Error where - S: StdError + Send + 'static, + S: error::Error + Send + 'static, { fn from(err: Box) -> Error { Error { diff --git a/aktoro-runtime/src/lib.rs b/aktoro-runtime/src/lib.rs index 8a38546..ef04cb4 100644 --- a/aktoro-runtime/src/lib.rs +++ b/aktoro-runtime/src/lib.rs @@ -2,12 +2,14 @@ mod actor; mod error; +mod net; mod runtime; mod tcp; mod udp; pub use crate::actor::Status; pub use crate::error::Error; +pub use crate::net::NetworkManager; pub use crate::runtime::Runtime; pub use crate::tcp::TcpClient; pub use crate::tcp::TcpServer; diff --git a/aktoro-runtime/src/net.rs b/aktoro-runtime/src/net.rs new file mode 100644 index 0000000..9d8c35d --- /dev/null +++ b/aktoro-runtime/src/net.rs @@ -0,0 +1,14 @@ +use aktoro_raw as raw; + +use crate::tcp::TcpClient; +use crate::tcp::TcpServer; +use crate::udp::UdpSocket; + +pub struct NetworkManager; + +impl raw::NetworkManager for NetworkManager { + type TcpClient = TcpClient; + type TcpServer = TcpServer; + + type UdpSocket = UdpSocket; +} diff --git a/aktoro-runtime/src/runtime.rs b/aktoro-runtime/src/runtime.rs index 7343734..bdc7fa6 100644 --- a/aktoro-runtime/src/runtime.rs +++ b/aktoro-runtime/src/runtime.rs @@ -1,10 +1,10 @@ use std::future::Future; use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_raw as raw; -use aktoro_raw::Context as RawContext; +use aktoro_raw::Runtime as RawRuntime; use fnv::FnvHashMap; use futures_core::Stream; use rand::FromEntropy; @@ -18,9 +18,7 @@ use crate::actor::KillSender as Kill; use crate::actor::KilledRecver; use crate::actor::KilledSender; use crate::error::Error; -use crate::tcp::TcpClient; -use crate::tcp::TcpServer; -use crate::udp::UdpSocket; +use crate::net::NetworkManager; /// An actor runtime using the [`runtime`] crate. /// @@ -37,25 +35,20 @@ pub struct Runtime { /// A receiver the the actors' killed /// channel, notified when an actor /// has stopped/been killed. + /// + /// It is shared among all the runtime's + /// actors. recver: KilledRecver, /// A fast (non-cryptographic) random /// number generator. rng: Xoshiro512StarStar, } -/// A future that resolves when all the -/// runtime's actors have been stopped. -pub struct Stop(Wait); - -/// A future that resolves when all the -/// runtime's actors have been stopped. -pub struct Wait { - rt: Runtime, - /// Contains a list of all the errors - /// that happened while waiting for - /// the actors to stop. - errors: Vec, -} +/// The stream returned by [`Runtime::wait`] +/// that allows to poll its actors. +/// +/// [`Runtime::wait`]: struct.Runtime.html#method.wait +pub struct Wait(Runtime); impl Runtime { /// Creates a new `Runtime`. @@ -65,27 +58,38 @@ impl Runtime { } impl raw::Runtime for Runtime { - type TcpClient = TcpClient; - type TcpServer = TcpServer; + type NetworkManager = NetworkManager; - type UdpSocket = UdpSocket; - - type Stop = Stop; type Wait = Wait; type Error = Error; - fn spawn(&mut self, actor: A) -> Option> { + fn actors(&self) -> Vec { + self.actors.keys().copied().collect() + } + + fn spawn(&mut self, actor: A) -> Option> + where + A: raw::Actor + 'static, + { + self.spawn_with(actor, Default::default()) + } + + fn spawn_with(&mut self, actor: A, config: C::Config) -> Option> + where + A: raw::Actor + 'static, + C: raw::Context, + { + // Generate the actor's ID. + let id = self.rng.next_u64(); + // Create a new context for the actor. - let mut ctx = A::Context::new(); + let mut ctx = C::new(id, config); // Create a new `Spawned` struct from // the actor's context. let spawned = raw::Spawned::new(&mut ctx); - // Generate the actor's ID. - let id = self.rng.next_u64(); - // Create the actor's kill channel. let (sender, recver) = actor::new_kill(); @@ -103,109 +107,83 @@ impl raw::Runtime for Runtime { Some(spawned) } - /// Asks to all the actors managed by the - /// runtime to stop, returning a future - /// resolving after all of them have been - /// stopped. - /// - /// ## Note - /// - /// Calling this method and polling the - /// returned future might be required to - /// poll the actors a first time, making - /// this method kind of useless if that's - /// the case. - fn stop(mut self) -> Stop { - // Ask for each actor to stop. - for (_, actor) in self.actors.iter_mut() { - actor.0.kill(); - } - - Stop(self.wait()) + fn net(&mut self) -> NetworkManager { + NetworkManager } - /// Waits for all the actors to be stopped, - /// returning a future waiting for it. - /// - /// ## Note - /// - /// Calling this method and polling the - /// returned future might be required to - /// poll the actors a first time. fn wait(self) -> Wait { - Wait { - rt: self, - errors: vec![], + Wait(self) + } + + fn stop(&mut self) { + // Ask to every actor to stop. + for (_, actor) in self.actors.iter_mut() { + actor.0.kill(); } } } -impl Future for Stop { - type Output = Result<(), Error>; +impl raw::Wait for Wait { + fn runtime(&self) -> &Runtime { + &self.0 + } - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { - // `Stop` is just a wrapper - // arround `Wait` (what differs - // is what happens before it - // is returned by the `stop` - // method). - Pin::new(&mut self.get_mut().0).poll(ctx) + fn into_runtime(self) -> Runtime { + self.0 } } -impl Future for Wait { - type Output = Result<(), Error>; +impl Stream for Wait { + type Item = Result; - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { - let wait = self.get_mut(); - let rt = &mut wait.rt; + fn poll_next( + self: Pin<&mut Self>, + ctx: &mut task::Context, + ) -> Poll>> { + let rt = &mut self.get_mut().0; - loop { - if rt.actors.is_empty() { - return Poll::Ready(Ok(())); - } + if rt.actors.is_empty() { + return Poll::Ready(None); + } - // We poll all the actors' handle. - let mut remove = vec![]; - for (id, act) in rt.actors.iter_mut() { - if let Poll::Ready(res) = Pin::new(&mut act.1).poll(ctx) { - remove.push(*id); + // We poll all the runtime's actors until + // one yields. + let mut remove = None; + for (id, act) in rt.actors.iter_mut() { + if let Poll::Ready(res) = Pin::new(&mut act.1).poll(ctx) { + remove = Some((*id, res)); - if let Err(err) = res { - wait.errors.push(err); - } - } + break; } + } + + // If an actor yielded, we remove it from + // the actors list and yield what's been + // yielded. + if let Some((id, res)) = remove { + let removed = rt.actors.remove(&id); - // We remove the dead actors. - for actor in remove { - if rt.actors.remove(&actor).is_none() { - wait.errors.push(Error::already_removed(actor)); - } + match (removed, res) { + (Some(_), Err(err)) => return Poll::Ready(Some(Err((id, err)))), + (None, Err(err)) => return Poll::Ready(Some(Err((id, Error::std(err))))), + _ => return Poll::Ready(Some(Ok(id))), } + } + + // We try to receive the identifier of the + // dead actors via the killed channel, to + // remove them and yield an update. + match Pin::new(&mut rt.recver).poll_next(ctx) { + Poll::Ready(Some(actor)) => { + rt.actors.remove(&actor); - // We try to poll from the actors' - // kill channel's receiver. - match Pin::new(&mut rt.recver).poll_next(ctx) { - Poll::Ready(Some(actor)) => { - if rt.actors.remove(&actor).is_none() { - wait.errors.push(Error::already_removed(actor)); - } - } - // If the channel has been closed, - // we stop the future. - Poll::Ready(None) => { - if wait.errors.len() > 1 { - return Poll::Ready(Err(Error::multiple(wait.errors.split_off(0)))); - } else if wait.errors.len() == 1 { - return Poll::Ready(Err(wait.errors.pop().unwrap())); - } else { - return Poll::Ready(Ok(())); - } - } - Poll::Pending => return Poll::Pending, + return Poll::Ready(Some(Ok(actor))); } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => (), } + + Poll::Pending } } @@ -221,3 +199,9 @@ impl Default for Runtime { } } } + +impl Drop for Runtime { + fn drop(&mut self) { + self.stop() + } +} diff --git a/aktoro-runtime/src/tcp.rs b/aktoro-runtime/src/tcp.rs index eaf2376..9318ab8 100644 --- a/aktoro-runtime/src/tcp.rs +++ b/aktoro-runtime/src/tcp.rs @@ -2,14 +2,14 @@ use std::future::Future; use std::net::SocketAddr; use std::net::ToSocketAddrs; use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_raw as raw; use futures_core::Stream; +use futures_io as io; use futures_io::AsyncRead; use futures_io::AsyncWrite; -use futures_io::Error as FutError; use runtime::net; use crate::error::Error; @@ -50,6 +50,20 @@ pub struct TcpIncoming<'i> { incoming: net::tcp::IncomingStream<'i>, } +/// A stream that yields new TCP +/// connections. +/// +/// ## Note +/// +/// It is similar to [`TcpIncoming`] +/// but because it actually holds the +/// tcp server it doesn't have lifetime +/// issues. +pub struct OwnedTcpIcoming { + /// The tcp server. + server: TcpServer, +} + /// A TCP stream, owned by a server, /// and allowing it to communicate with /// a client. @@ -89,13 +103,19 @@ impl raw::TcpServer for TcpServer { } } - fn incoming<'s>( - &'s mut self, - ) -> Result> + 's>, Error> { + fn incoming<'i>( + &'i mut self, + ) -> Result> + Unpin + Send + 'i>, Error> { Ok(Box::new(TcpIncoming { incoming: self.listener.incoming(), })) } + + fn into_incoming( + self, + ) -> Result> + Unpin + Send>, Error> { + Ok(Box::new(OwnedTcpIcoming { server: self })) + } } impl raw::TcpStream for TcpClient { @@ -137,7 +157,7 @@ impl raw::TcpStream for TcpStream { impl Future for Connect { type Output = Result; - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll { + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll { match Pin::new(&mut self.get_mut().connect).poll(ctx) { Poll::Ready(Ok(stream)) => Poll::Ready(Ok(TcpClient { stream })), Poll::Ready(Err(err)) => Poll::Ready(Err(Box::new(err).into())), @@ -149,7 +169,7 @@ impl Future for Connect { impl<'i> Stream for TcpIncoming<'i> { type Item = Result; - fn poll_next(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { match Pin::new(&mut self.get_mut().incoming).poll_next(ctx) { Poll::Ready(Some(res)) => match res { Ok(stream) => Poll::Ready(Some(Ok(TcpStream { stream }))), @@ -161,30 +181,45 @@ impl<'i> Stream for TcpIncoming<'i> { } } +impl Stream for OwnedTcpIcoming { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { + match Pin::new(&mut self.get_mut().server.listener.accept()).poll(ctx) { + Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(TcpStream { stream }))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(Box::new(err).into()))), + Poll::Pending => Poll::Pending, + } + } +} + impl AsyncRead for TcpClient { fn poll_read( self: Pin<&mut Self>, - ctx: &mut FutContext, + ctx: &mut task::Context, buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_read(ctx, buf) + ) -> Poll> { + match Pin::new(&mut self.get_mut().stream).poll_read(ctx, buf) { + Poll::Ready(Ok(read)) if read == 0 => Poll::Pending, + polled => polled, + } } } impl AsyncWrite for TcpClient { fn poll_write( self: Pin<&mut Self>, - ctx: &mut FutContext, + ctx: &mut task::Context, buf: &[u8], - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.get_mut().stream).poll_write(ctx, buf) } - fn poll_flush(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { Pin::new(&mut self.get_mut().stream).poll_flush(ctx) } - fn poll_close(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_close(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { Pin::new(&mut self.get_mut().stream).poll_close(ctx) } } @@ -192,27 +227,30 @@ impl AsyncWrite for TcpClient { impl AsyncRead for TcpStream { fn poll_read( self: Pin<&mut Self>, - ctx: &mut FutContext, + ctx: &mut task::Context, buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_read(ctx, buf) + ) -> Poll> { + match Pin::new(&mut self.get_mut().stream).poll_read(ctx, buf) { + Poll::Ready(Ok(read)) if read == 0 => Poll::Pending, + polled => polled, + } } } impl AsyncWrite for TcpStream { fn poll_write( self: Pin<&mut Self>, - ctx: &mut FutContext, + ctx: &mut task::Context, buf: &[u8], - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.get_mut().stream).poll_write(ctx, buf) } - fn poll_flush(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { Pin::new(&mut self.get_mut().stream).poll_flush(ctx) } - fn poll_close(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll> { + fn poll_close(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll> { Pin::new(&mut self.get_mut().stream).poll_close(ctx) } } diff --git a/aktoro-runtime/src/udp.rs b/aktoro-runtime/src/udp.rs index d3ebd9e..f3f74ee 100644 --- a/aktoro-runtime/src/udp.rs +++ b/aktoro-runtime/src/udp.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::net::SocketAddr; use std::net::ToSocketAddrs; use std::pin::Pin; -use std::task::Context as FutContext; +use std::task; use std::task::Poll; use aktoro_raw as raw; @@ -80,7 +80,7 @@ impl raw::UdpSocket for UdpSocket { impl<'s, 'b> Future for SendTo<'s, 'b> { type Output = Result; - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll { + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll { match Pin::new(&mut self.get_mut().send_to).poll(ctx) { Poll::Ready(Ok(sent)) => Poll::Ready(Ok(sent)), Poll::Ready(Err(err)) => Poll::Ready(Err(Box::new(err).into())), @@ -92,7 +92,7 @@ impl<'s, 'b> Future for SendTo<'s, 'b> { impl<'s, 'b> Future for Recv<'s, 'b> { type Output = Result<(usize, SocketAddr), Error>; - fn poll(self: Pin<&mut Self>, ctx: &mut FutContext) -> Poll { + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context) -> Poll { match Pin::new(&mut self.get_mut().recv_from).poll(ctx) { Poll::Ready(Ok(recved)) => Poll::Ready(Ok(recved)), Poll::Ready(Err(err)) => Poll::Ready(Err(Box::new(err).into())), diff --git a/ci/azure-pipelines.yml b/ci/azure-pipelines.yml index 4e84802..4d246c0 100644 --- a/ci/azure-pipelines.yml +++ b/ci/azure-pipelines.yml @@ -11,5 +11,5 @@ jobs: displayName: 'Test' - bash: | rustup component add clippy - cargo clippy --all-targets --all-features -- -D warnings + cargo clippy --all --all-features -- -D warnings displayName: 'Clippy' diff --git a/examples/hello_world/Cargo.toml b/examples/hello_world/Cargo.toml index c26c4d1..3f0c29f 100644 --- a/examples/hello_world/Cargo.toml +++ b/examples/hello_world/Cargo.toml @@ -14,4 +14,4 @@ version = "0.1.0-alpha.3" [dependencies.futures-util-preview] version = "0.3.0-alpha.17" -features = ["nightly", "async-await"] +features = ["nightly", "async-await", "select-macro"] diff --git a/examples/hello_world/src/main.rs b/examples/hello_world/src/main.rs index 4b84a33..9fcbb07 100644 --- a/examples/hello_world/src/main.rs +++ b/examples/hello_world/src/main.rs @@ -1,9 +1,9 @@ #![feature(async_await)] -use std::task::Poll; - use aktoro::prelude::*; -use futures_util::poll; +use futures_util::select; +use futures_util::FutureExt; +use futures_util::StreamExt; struct HelloActor; @@ -13,17 +13,15 @@ struct Bye(&'static str); struct Kill; impl Actor for HelloActor { - type Context = Context; - + type Context = Context; type Status = Status; - type Error = Error; } impl Handler for HelloActor { type Output = String; - fn handle(&mut self, msg: Hello, _: &mut Context) -> Result { + fn handle(&mut self, msg: Hello, _: &mut Self::Context) -> Result { Ok(format!("Hello, {}!", msg.0)) } } @@ -31,7 +29,7 @@ impl Handler for HelloActor { impl Handler for HelloActor { type Output = String; - fn handle(&mut self, msg: Bye, _: &mut Context) -> Result { + fn handle(&mut self, msg: Bye, _: &mut Self::Context) -> Result { Ok(format!("Bye, {}!", msg.0)) } } @@ -39,8 +37,8 @@ impl Handler for HelloActor { impl ActionHandler for HelloActor { type Output = (); - fn handle(&mut self, _: Kill, ctx: &mut Context) -> Result<(), Error> { - ctx.set_status(Status::Stopped); + fn handle(&mut self, _: Kill, ctx: &mut Self::Context) -> Result<(), Error> { + ctx.set_status(Status::Dead); Ok(()) } } @@ -51,18 +49,15 @@ async fn main() { let spawned = rt.spawn(HelloActor).unwrap(); - let mut run = runtime::spawn(run("World", spawned)); - let mut wait = rt.wait(); - - loop { - if let Poll::Ready(_) = poll!(&mut run) { - break; - } + let mut run = runtime::spawn(run("World", spawned)).fuse(); + let mut wait = rt.wait().fuse(); - if let Poll::Ready(res) = poll!(&mut wait) { - res.expect("an error occured while waiting for the runtime to stop"); - break; - } + select! { + _ = run => (), + res = wait.next() => { + res.unwrap() + .expect("an error occured while waiting for the runtime to stop"); + }, } } diff --git a/examples/net/Cargo.toml b/examples/net/Cargo.toml new file mode 100644 index 0000000..6c57892 --- /dev/null +++ b/examples/net/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "net" +version = "0.1.0" +license = "MIT" +authors = ["Matthieu Le Brazidec "] +edition = "2018" + +[dependencies] +futures-io-preview = "0.3.0-alpha.17" +runtime = "0.3.0-alpha.6" + +[dependencies.aktoro] +path = "../.." +version = "0.1.0-alpha.3" + +[dependencies.futures-util-preview] +version = "0.3.0-alpha.17" +features = ["nightly", "async-await"] diff --git a/examples/net/LICENSE.md b/examples/net/LICENSE.md new file mode 120000 index 0000000..f0608a6 --- /dev/null +++ b/examples/net/LICENSE.md @@ -0,0 +1 @@ +../../LICENSE.md \ No newline at end of file diff --git a/examples/net/src/agent.rs b/examples/net/src/agent.rs new file mode 100644 index 0000000..75bd02f --- /dev/null +++ b/examples/net/src/agent.rs @@ -0,0 +1,85 @@ +use std::pin::Pin; + +use aktoro::prelude::*; +use aktoro::raw; +use futures_util::io::ReadHalf; +use futures_util::io::WriteHalf; + +use crate::Received; +use crate::Sent; + +struct Closed; + +pub(crate) struct Agent { + pub(crate) read: Option>>>, + pub(crate) write: Option>>>, + pub(crate) cancellable: Option>>, +} + +impl Actor for Agent +where + S: raw::TcpStream + 'static, +{ + type Context = Context; + type Status = Status; + type Error = Error; + + fn started(&mut self, ctx: &mut Self::Context) { + println!("agent({}): started", ctx.actor_id()); + + self.cancellable = Some(ctx.read(self.read.take().unwrap(), 64, Received, |_| ())); + } +} + +impl Handler> for Agent +where + S: raw::TcpStream + 'static, +{ + type Output = (); + + fn handle(&mut self, _: Sent, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!("agent({}): sent data", ctx.actor_id()); + + println!("agent({}): closing stream", ctx.actor_id()); + ctx.wait(Box::pin(self.cancellable.take().unwrap().cancel()), |_| { + Closed + }); + + Ok(()) + } +} + +impl Handler for Agent +where + S: raw::TcpStream + 'static, +{ + type Output = (); + + fn handle(&mut self, msg: Received, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!("agent({}): received data; data={:?}", ctx.actor_id(), msg.0); + + ctx.write( + self.write.take().unwrap(), + vec![0], + |_, write| Sent(write), + |_| (), + ); + + Ok(()) + } +} + +impl Handler for Agent +where + S: raw::TcpStream + 'static, +{ + type Output = (); + + fn handle(&mut self, _: Closed, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!("agent({}): closed stream", ctx.actor_id()); + + ctx.set_status(Status::Dead); + + Ok(()) + } +} diff --git a/examples/net/src/client.rs b/examples/net/src/client.rs new file mode 100644 index 0000000..34c4635 --- /dev/null +++ b/examples/net/src/client.rs @@ -0,0 +1,108 @@ +use std::pin::Pin; + +use aktoro::prelude::*; +use aktoro::raw; +use futures_util::io::WriteHalf; +use futures_util::AsyncReadExt; + +use crate::Received; +use crate::Sent; + +struct Connected(C); + +struct ConnectedErr(::Error); + +pub(crate) struct Client { + pub(crate) connect: Option>>, + pub(crate) closed_connect: Option>>, + pub(crate) write: Option>>>, +} + +impl Actor for Client +where + C: raw::TcpClient + 'static, +{ + type Context = Context; + type Status = Status; + type Error = Error; + + fn started(&mut self, ctx: &mut Self::Context) { + println!("client({}): started", ctx.actor_id()); + println!("client({}): connecting", ctx.actor_id()); + + let connect = self.connect.take().unwrap(); + ctx.wait(connect, |res| Connected(res.unwrap())); + } +} + +impl Handler> for Client +where + C: raw::TcpClient + 'static, +{ + type Output = (); + + fn handle(&mut self, msg: Connected, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!("client({}): connected", ctx.actor_id()); + + let (read, write) = msg.0.split(); + ctx.read(Box::pin(read), 64, Received, |_| ()); + ctx.write(Box::pin(write), vec![0], |_, write| Sent(write), |_| ()); + + Ok(()) + } +} + +impl Handler> for Client +where + C: raw::TcpClient + 'static, +{ + type Output = (); + + fn handle(&mut self, msg: ConnectedErr, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!( + "client({}): failed connecting; reason={}", + ctx.actor_id(), + msg.0 + ); + + ctx.set_status(Status::Dead); + + Ok(()) + } +} + +impl Handler> for Client +where + C: raw::TcpClient + 'static, +{ + type Output = (); + + fn handle(&mut self, msg: Sent, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!("client({}): sent data", ctx.actor_id()); + + self.write = Some(msg.0); + + Ok(()) + } +} + +impl Handler for Client +where + C: raw::TcpClient + 'static, +{ + type Output = (); + + fn handle(&mut self, msg: Received, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!( + "client({}): received data; data={:?}", + ctx.actor_id(), + msg.0 + ); + + println!("client({}): connecting (failing)", ctx.actor_id()); + let connect = self.closed_connect.take().unwrap(); + ctx.wait(connect, |res| ConnectedErr(res.err().unwrap())); + + Ok(()) + } +} diff --git a/examples/net/src/main.rs b/examples/net/src/main.rs new file mode 100644 index 0000000..474ac6d --- /dev/null +++ b/examples/net/src/main.rs @@ -0,0 +1,46 @@ +#![feature(async_await)] + +use std::pin::Pin; + +use aktoro::prelude::*; +use aktoro::raw; +use futures_util::io::WriteHalf; +use futures_util::StreamExt; + +mod agent; +mod client; +mod server; + +use client::Client; +use server::Server; + +struct Sent(Pin>>); + +struct Received(Vec); + +#[runtime::main] +async fn main() { + let mut rt = Runtime::new(); + let net = rt.net(); + + let server = Server { + tcp: Some(net.tcp_bind("127.0.0.1:5555").unwrap()), + cancellable: None, + }; + + rt.spawn(server).unwrap(); + + let client = Client:: { + connect: Some(Box::pin(net.tcp_connect("127.0.0.1:5555").unwrap())), + closed_connect: Some(Box::pin(net.tcp_connect("127.0.0.1:5555").unwrap())), + write: None, + }; + + rt.spawn(client).unwrap(); + + let mut wait = rt.wait(); + + while let Some(res) = wait.next().await { + res.expect("an error occured while waiting for the runtime to stop"); + } +} diff --git a/examples/net/src/server.rs b/examples/net/src/server.rs new file mode 100644 index 0000000..0f14802 --- /dev/null +++ b/examples/net/src/server.rs @@ -0,0 +1,86 @@ +use aktoro::prelude::*; +use aktoro::raw; +use futures_util::AsyncReadExt; +use futures_util::StreamExt; + +use crate::agent::Agent; + +struct Connection(S::Stream); + +struct Died; + +pub(crate) struct Server { + pub(crate) tcp: Option, + pub(crate) cancellable: Option>>, +} + +impl Actor for Server +where + S: raw::TcpServer + 'static, +{ + type Context = Context; + type Status = Status; + type Error = Error; + + fn started(&mut self, ctx: &mut Self::Context) { + println!("server({}): started", ctx.actor_id()); + println!("server({}): listening", ctx.actor_id()); + + let tcp = self.tcp.take().unwrap(); + self.cancellable = Some( + ctx.subscribe(Box::pin(tcp.into_incoming().unwrap()), |conn| { + Connection(conn.unwrap()) + }), + ); + } +} + +impl Handler> for Server +where + S: raw::TcpServer + 'static, +{ + type Output = (); + + fn handle(&mut self, msg: Connection, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!("server({}): new connection", ctx.actor_id()); + + let actor_id = ctx.actor_id(); + println!("server({}): closing", actor_id); + ctx.blocking_wait( + Box::pin(self.cancellable.take().unwrap().cancel()), + move |_| println!("server({}): closed", actor_id), + ); + + let (read, write) = msg.0.split(); + let spawned = ctx + .spawn(Agent { + read: Some(Box::pin(read)), + write: Some(Box::pin(write)), + cancellable: None, + }) + .unwrap(); + + ctx.subscribe(spawned.boxed(), |_| Died); + + Ok(()) + } +} + +impl Handler for Server +where + S: raw::TcpServer + 'static, +{ + type Output = (); + + fn handle(&mut self, _: Died, ctx: &mut Self::Context) -> Result<(), Self::Error> { + println!( + "server({}): agent died; remaining: {:?}", + ctx.actor_id(), + ctx.actors() + ); + + ctx.set_status(Status::Dead); + + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 7b0a88f..34cf4e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,8 @@ pub use aktoro_runtime as runtime; pub mod prelude { pub use aktoro_raw::Actor; + pub use aktoro_raw::Cancellable; + pub use aktoro_raw::Cancelling; pub use aktoro_raw::Context as RawContext; pub use aktoro_raw::ActionHandler; @@ -24,8 +26,11 @@ pub mod prelude { pub use aktoro_raw::Updated as RawUpdated; pub use aktoro_raw::Updater as RawUpdater; + pub use aktoro_raw::NetworkManager as RawNetworkManager; + pub use aktoro_raw::OwnedTcpServerIncoming; pub use aktoro_raw::TcpClient as RawTcpClient; pub use aktoro_raw::TcpServer as RawTcpServer; + pub use aktoro_raw::TcpServerIncoming; pub use aktoro_raw::TcpStream as RawTcpStream; pub use aktoro_raw::UdpSocket as RawUdpSocket;