Skip to content

Commit

Permalink
Flesh out documentation more and add a prelude (#22)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
  • Loading branch information
mxgrey authored Aug 26, 2024
1 parent 2b1ca3b commit 3f6baa2
Show file tree
Hide file tree
Showing 20 changed files with 282 additions and 82 deletions.
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# :warning: Under Construction :warning:

# Reactive Programming for Bevy

This library provides sophisticated [reactive programming](https://en.wikipedia.org/wiki/Reactive_programming) for the [bevy](https://bevyengine.org/) ECS. In addition to supporting one-shot chains of async operations, it can support reusable workflows with parallel branches, synchronization, races, and cycles. These workflows can be hierarchical, so workflows can be built out of other workflows.
This library provides sophisticated [reactive programming](https://en.wikipedia.org/wiki/Reactive_programming) for the [bevy](https://bevyengine.org/) ECS. In addition to supporting one-shot chains of async operations, it can support reusable workflows with parallel branches, synchronization, races, and cycles. These workflows can be hierarchical, so a workflow can be used as a building block by other workflows.

![sense-think-act workflow](assets/figures/sense-think-act_workflow.svg)

# Why use bevy impulse?

There are several different categories of problems that bevy impulse sets out to solve. If any one of these use-cases is relevant to you, it's worth considering bevy impulse as a solution:

* Coordinating **async activities** (e.g. filesystem i/o, network i/o, or long-running calculations) with regular bevy systems
* Calling **one-shot systems** on an ad hoc basis, where the systems require an input value and produce an output value that you need to use
* Defining a **procedure** to be followed by your application or by an agent or pipeline within your application
* Designing a complex **state machine** that gradually switches between different modes or behaviors while interacting with the world
* Managing many **parallel threads** of activities that need to be synchronized or raced against each other

# Helpful Links

Expand Down
1 change: 1 addition & 0 deletions assets/figures/sense-think-act_workflow.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ where

#[cfg(test)]
mod tests {
use crate::{testing::*, *};
use crate::{prelude::*, testing::*, Gate};
use std::future::Future;

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ impl CleanupWorkflowConditions {

#[cfg(test)]
mod tests {
use crate::{testing::*, *};
use crate::{prelude::*, testing::*, CancellationCause};
use smallvec::SmallVec;

#[test]
Expand Down
96 changes: 79 additions & 17 deletions src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,101 @@ use std::{
sync::{Arc, Mutex},
};

/// A Callback is similar to a [`Service`](crate::Service) except it is not
/// associated with an [`Entity`]. Instead it can be passed around and shared as an
/// object. Cloning the Callback will produce a new reference to the same underlying
/// instance. If the Callback has any internal state (e.g. [`Local`](bevy::prelude::Local)
/// A Callback is an object that implements [`Provider`], similar to [`Service`](crate::Service),
/// except it is not associated with an [`Entity`]. Instead it can be passed around and
/// shared as its own object. Cloning a Callback will produce a new reference to the
/// same underlying instance. If the Callback has any internal state (e.g. [`Local`](bevy_ecs::prelude::Local)
/// parameters, change trackers, or mutable captured variables), that internal state will
/// be shared among all its clones.
///
/// There are three ways to instantiate a callback:
///
/// ### [`.as_callback()`](AsCallback)
/// ## [`.as_callback()`](AsCallback)
///
/// If you have a Bevy system with an input parameter of `In<`[`AsyncCallback`]`>`
/// or `In<`[`BlockingCallback`]`>` then you can convert it into a callback
/// If you have a Bevy system with an input parameter of `In<`[`BlockingCallback`]`>`
/// or `In<`[`AsyncCallback`]`>` then you can convert it into a [`Callback`]
/// object by applying `.as_callback()`.
///
/// ### [`.into_async_callback()`](IntoAsyncCallback)
/// ```rust
/// use bevy_impulse::{prelude::*, testing::Integer};
/// use bevy_ecs::prelude::*;
///
/// If you have a Bevy system whose return type implements the [`Future`] trait,
/// it can be converted into an async callback object by applying
/// `.into_async_callback()` to it. The `Response` type of the callback will be
/// `Future::Output` rather than the return type of the system. The return value
/// will be polled in an async compute task pool.
/// fn add_integer(
/// In(input): In<BlockingCallback<i32>>,
/// integer: Res<Integer>,
/// ) -> i32 {
/// input.request + integer.value
/// }
///
/// let callback = add_integer.as_callback();
/// ```
///
/// ```rust
/// use bevy_impulse::{prelude::*, testing::Integer};
/// use bevy_ecs::prelude::*;
/// use std::future::Future;
///
/// ### [`.into_blocking_callback()`](IntoBlockingCallback)
/// fn add_integer_async(
/// In(input): In<AsyncCallback<i32>>,
/// integer: Res<Integer>,
/// ) -> impl Future<Output = i32> {
/// let value = integer.value;
/// async move { input.request + value }
/// }
///
/// Any Bevy system can be converted into a blocking callback by applying
/// let async_callback = add_integer_async.as_callback();
/// ```
///
/// ## [`.into_blocking_callback()`](IntoBlockingCallback)
///
/// Any Bevy system can be converted into a blocking [`Callback`] by applying
/// `.into_blocking_callback()` to it. The `Request` type of the callback will
/// be whatever the input type of the system is (the `T` inside of `In<T>`). The
/// `Response` type of the callback will be whatever the return value of the
/// callback is.
///
/// A blocking callback is always an exclusive system, so it will block all
/// other systems from running until it is finished.
/// A blocking callback is always run as an exclusive system (even if it does not
/// use exclusive system parameters), so it will block all other systems from
/// running until it is finished.
///
/// ```rust
/// use bevy_impulse::{prelude::*, testing::Integer};
/// use bevy_ecs::prelude::*;
///
/// fn add_integer(
/// In(input): In<i32>,
/// integer: Res<Integer>,
/// ) -> i32 {
/// input + integer.value
/// }
///
/// let callback = add_integer.into_blocking_callback();
/// ```
///
/// ## [`.into_async_callback()`](IntoAsyncCallback)
///
/// If you have a Bevy system whose return type implements the [`Future`] trait,
/// it can be converted into an async [`Callback`] object by applying
/// `.into_async_callback()` to it. The `Response` type of the callback will be
/// `<T as Future>::Output` where `T` is the return type of the system. The `Future`
/// returned by the system will be polled in the async compute task pool (unless
/// you activate the `single_threaded_async` feature).
///
/// ```rust
/// use bevy_impulse::{prelude::*, testing::Integer};
/// use bevy_ecs::prelude::*;
/// use std::future::Future;
///
/// fn add_integer(
/// In(input): In<i32>,
/// integer: Res<Integer>,
/// ) -> impl Future<Output = i32> {
/// let value = integer.value;
/// async move { input + value }
/// }
///
/// let callback = add_integer.into_async_callback();
/// ```
pub struct Callback<Request, Response, Streams = ()> {
pub(crate) inner: Arc<Mutex<InnerCallback<Request, Response, Streams>>>,
}
Expand Down
8 changes: 4 additions & 4 deletions src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
}

/// Apply a map whose output is a Future that will be run in the
/// [`AsyncComputeTaskPool`](bevy::tasks::AsyncComputeTaskPool) (unless
/// [`AsyncComputeTaskPool`](bevy_tasks::AsyncComputeTaskPool) (unless
/// the `single_threaded_async` feature is active). The output of the Future
/// will be the Response of the returned Chain.
pub fn map_async<Task>(
Expand Down Expand Up @@ -723,7 +723,7 @@ where
/// that trait, then you can use [`Self::cancel_on_quiet_err`] instead.
///
/// ```
/// use bevy_impulse::{*, testing::*};
/// use bevy_impulse::{prelude::*, testing::*};
///
/// let mut context = TestingContext::minimal_plugins();
///
Expand Down Expand Up @@ -959,7 +959,7 @@ where
/// various reasons, this returns a [`Result`]. Follow this with
/// `.dispose_on_err` to filter away errors.
///
/// To access the streams of the service, use [`Chain::then_request_node`].
/// To access the streams of the service, use [`Chain::then_injection_node`].
pub fn then_injection(self) -> Chain<'w, 's, 'a, 'b, Response> {
let source = self.target;
let node = self
Expand Down Expand Up @@ -1032,7 +1032,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {

#[cfg(test)]
mod tests {
use crate::{testing::*, *};
use crate::{prelude::*, testing::*};
use smallvec::SmallVec;

#[test]
Expand Down
6 changes: 5 additions & 1 deletion src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ use crate::{
StreamRequest,
};

/// Provides asynchronous access to the [`World`], allowing you to issue queries
/// or commands and then await the result.
#[derive(Clone)]
pub struct Channel {
inner: Arc<InnerChannel>,
}

impl Channel {
/// Run a query in the world and receive the promise of the query's output.
pub fn query<P>(&self, request: P::Request, provider: P) -> Promise<P::Response>
where
P: Provider,
Expand All @@ -49,6 +52,7 @@ impl Channel {
.flatten()
}

/// Get access to a [`Commands`] for the [`World`]
pub fn command<F, U>(&self, f: F) -> Promise<U>
where
F: FnOnce(&mut Commands) -> U + 'static + Send,
Expand Down Expand Up @@ -170,7 +174,7 @@ impl<T: Stream> StreamChannel<T> {

#[cfg(test)]
mod tests {
use crate::{testing::*, *};
use crate::{prelude::*, testing::*};
use bevy_ecs::system::EntityCommands;
use std::time::Duration;

Expand Down
6 changes: 3 additions & 3 deletions src/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl Gate {
/// [2]: crate::Chain::then_gate
/// [3]: crate::Builder::create_gate_open
/// [4]: crate::Builder::create_gate_close
/// [5]: crate::chain::then_gate_open
/// [6]: crate::chain::then_gate_close
/// [5]: crate::Chain::then_gate_open
/// [6]: crate::Chain::then_gate_close
pub struct GateRequest<T> {
/// Indicate what action the gate should take
pub action: Gate,
Expand All @@ -75,7 +75,7 @@ pub struct GateRequest<T> {

#[cfg(test)]
mod tests {
use crate::{testing::*, *};
use crate::{prelude::*, testing::*};

#[test]
fn test_gate_actions() {
Expand Down
4 changes: 2 additions & 2 deletions src/impulse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ where
/// feature is active). The output of the [`Future`] will be the Response of
/// the returned Impulse.
///
/// [1]: bevy::tasks::AsyncComputeTaskPool
/// [1]: bevy_tasks::AsyncComputeTaskPool
#[must_use]
pub fn map_async<Task>(
self,
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<T> Default for Collection<T> {

#[cfg(test)]
mod tests {
use crate::{testing::*, *};
use crate::{prelude::*, testing::*, ContinuousQueueView};
use bevy_utils::label::DynEq;
use smallvec::SmallVec;
use std::{
Expand Down
67 changes: 53 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,37 @@
*
*/

//! `bevy_impulse` is an extension to the [Bevy](https://bevyengine.org) game
//! ![sense-think-act workflow](https://raw.githubusercontent.com/open-rmf/bevy_impulse/update_docs/assets/figures/sense-think-act_workflow.svg)
//!
//! Bevy impulse is an extension to the [Bevy](https://bevyengine.org) game
//! engine that allows you to transform [bevy systems](https://bevyengine.org/learn/quick-start/getting-started/ecs/)
//! into services and workflows that can be used for reactive service-oriented
//! programming.
//!
//! ## Services
//!
//! One primitive of reactive programming is a [service](https://en.wikipedia.org/wiki/Service_(systems_architecture)).
//! In `bevy_impulse`, a service is a bevy system that is associated with an
//! One primitive element of reactive programming is a [service](https://en.wikipedia.org/wiki/Service_(systems_architecture)).
//! In bevy impulse, a [`Service`] is a bevy system that is associated with an
//! entity and can be created using [`Commands::spawn_service`](SpawnServicesExt::spawn_service)
//! or [`App::add_service`](AddServicesExt::add_service).
//!
//! When you [spawn](SpawnServicesExt::spawn_service) a service you will
//! immediately receive a [`Service`] object which can be used to refer to it.
//! If you do not want to hang onto the service object, you can find previously
//! spawned services later using the [`ServiceDiscovery`] system parameter.
//! When you spawn a service you will immediately receive a [`Service`] object
//! which references the newly spawned service. If you do not want to hang onto the [`Service`]
//! object, you can find previously spawned services later using the [`ServiceDiscovery`]
//! system parameter.
//!
//! Sometimes [`Service`] is not quite the right fit for your use case, so bevy impulse
//! offers a generalization of services callled [`Provider`] which has some
//! more options for defining a reactive element.
//!
//! ## Workflows
//!
//! For complex async workflows, a single bevy system may not be sufficient.
//! You can instead build workflows using [`Command::spawn_workflow`](SpawnWorkflow::spawn_workflow).
//! You can instead build workflows using [`.spawn_workflow`](SpawnWorkflowExt::spawn_workflow)
//! on [`Commands`](bevy_ecs::prelude::Commands) or [`World`](bevy_ecs::prelude::World).
//! A workflow lets you create a graph of [nodes](Node) where each node is a
//! service with an input, an output, and possibly streams.
//! [service](Service) (or more generally a [provider](Provider)) with an input,
//! an output, and possibly streams.
//!
//! There are various operations that can be performed between nodes, such as
//! forking and joining. These operations are built using [`Chain`].
Expand All @@ -54,8 +62,8 @@
//! [`Impulse::then`]. Any impulse chain that you create will only run exactly
//! once.
//!
//! Once you've finished creating your chain, use [`Impulse::detach`] to let it
//! run freely, or use [`Impulse::take`] to receive a [`Promise`] of the final
//! Once you've finished building your chain, use [`Impulse::detach`] to let it
//! run freely, or use [`Impulse::take`] to get a [`Recipient`] of the final
//! result.

mod async_execution;
Expand Down Expand Up @@ -149,7 +157,7 @@ use bevy_ecs::prelude::{Entity, In};
///
/// ```
/// use bevy_ecs::prelude::*;
/// use bevy_impulse::*;
/// use bevy_impulse::prelude::*;
///
/// #[derive(Component, Resource)]
/// struct Precision(i32);
Expand Down Expand Up @@ -256,7 +264,7 @@ pub struct AsyncCallback<Request, Streams: StreamPack = ()> {
/// `StreamChannel`s, whichever matches the [`StreamPack`] description.
pub streams: Streams::Channel,
/// The channel that allows querying and syncing with the world while the
/// service runs asynchronously.
/// callback executes asynchronously.
pub channel: Channel,
/// The node in a workflow or impulse chain that asked for the callback
pub source: Entity,
Expand Down Expand Up @@ -297,7 +305,7 @@ pub struct AsyncMap<Request, Streams: StreamPack = ()> {
/// `StreamChannel`s, whichever matches the [`StreamPack`] description.
pub streams: Streams::Channel,
/// The channel that allows querying and syncing with the world while the
/// service runs asynchronously.
/// map executes asynchronously.
pub channel: Channel,
/// The node in a workflow or impulse chain that asked for the callback
pub source: Entity,
Expand All @@ -319,3 +327,34 @@ impl Plugin for ImpulsePlugin {
app.add_systems(Update, flush_impulses());
}
}

pub mod prelude {
pub use crate::{
buffer::{
Buffer, BufferAccess, BufferAccessMut, BufferKey, BufferSettings, Bufferable, Buffered,
IterBufferable, RetentionPolicy,
},
builder::Builder,
callback::{AsCallback, Callback, IntoAsyncCallback, IntoBlockingCallback},
chain::{Chain, ForkCloneBuilder, UnzipBuilder, Unzippable},
flush::flush_impulses,
impulse::{Impulse, Recipient},
map::{AsMap, IntoAsyncMap, IntoBlockingMap},
map_once::{AsMapOnce, IntoAsyncMapOnce, IntoBlockingMapOnce},
node::{ForkCloneOutput, InputSlot, Node, Output},
promise::{Promise, PromiseState},
provider::{ProvideOnce, Provider},
request::{RequestExt, RunCommandsOnWorldExt},
service::{
traits::*, AddContinuousServicesExt, AddServicesExt, AsDeliveryInstructions,
DeliveryInstructions, DeliveryLabel, DeliveryLabelId, IntoAsyncService,
IntoBlockingService, Service, ServiceDiscovery, SpawnServicesExt,
},
stream::{Stream, StreamFilter, StreamOf, StreamPack},
trim::{TrimBranch, TrimPoint},
workflow::{DeliverySettings, Scope, ScopeSettings, SpawnWorkflowExt, WorkflowSettings},
AsyncCallback, AsyncCallbackInput, AsyncMap, AsyncService, AsyncServiceInput,
BlockingCallback, BlockingCallbackInput, BlockingMap, BlockingService,
BlockingServiceInput, ContinuousQuery, ContinuousService, ContinuousServiceInput,
};
}
Loading

0 comments on commit 3f6baa2

Please sign in to comment.