From 4a3e3647fdc8c910d01b63f9cace312b1e64ea8b Mon Sep 17 00:00:00 2001 From: Aaron Arinder Date: Thu, 19 Sep 2024 16:02:17 -0400 Subject: [PATCH] document(subtask): adding docs for Subtasks and the associated traits --- src/composition/watchers/subtask.rs | 85 +++++++++++++++++++++++++---- 1 file changed, 75 insertions(+), 10 deletions(-) diff --git a/src/composition/watchers/subtask.rs b/src/composition/watchers/subtask.rs index 0d09ed18dd..0fd78f8411 100644 --- a/src/composition/watchers/subtask.rs +++ b/src/composition/watchers/subtask.rs @@ -1,4 +1,58 @@ -// TODO: we should document this +//! A Subtask is a task that runs in the background and is able to receive events and (if the +//! SubtaskHandleStream trait is implemented) emit events via unbounded channels. +//! +//! There are two important traits that you'll implement in order to use Subtasks: +//! +//! SubtaskHandleUnit - for receiving events, +//! SubtaskHandleStream - for both receiving and emitting events +//! +//! There are examples in the codebase for both, but they follow a similar pattern: a `handle` +//! function is implemented that receives an `UnboundedSender` for some `Output` +//! defined by the consumer. For `SubtaskHandleStream`, an `input` is required for receiving +//! events +//! +//! +//! Here is an example implementation of implementing `SubtaskHandleUnit` +//! +//! ```rust,ignore +//! impl SubtaskHandleUnit for SomeType { +//! // maybe this exists above, maybe it's something simple like an +//! //empty tuple +//! type Output = SomeOutput; +//! +//! fn handle(self, sender: UnboundedSender) -> AbortHandle { +//! tokio::spawn(async move { +//! // Make sure you keep the watcher's from being called multiple times by +//! // putting it on its own line +//! let mut watcher = self.some_fn_for_recv_events().await; +//! // Watching for events is pretty straightforward when you have a BoxStream, which +//! // gives you .next() +//! while let Some(_change) = watcher.next().await { +//! // When something happens that we want to react to, we can emit an event +//! let _ = sender +//! .send(SomeType) +//! .tap_err(|err| tracing::error!("{:?}", err)); +//! } +//! }) +//! // An abort handle is returned in case we need to abort the task (eg, for some failure +//! // outside of it) +//! .abort_handle() +//! } +//! } +//! ``` +//! +//! Once you've implemented either SubtaskHandleUnit or SubtaskHandleStream for a type, you can +//! `.run()` it to begin the Subtask and receive an UnboundedStream for sending the events being +//! emitted from that Subtask to other consumers: +//! +//! ```rust,ignore +//! // Create the SomeType Subtask, returning a receiver for others to stream events coming out +//! // of the SomeType subtask +//! let (events_for_others_to_ingest, sometype_subtask) = Subtask::new(SomeType); +//! +//! // Listen to events coming from some other Subtask +//! sometype_subtask.run(some_other_event_stream); +//! ``` use futures::stream::BoxStream; use tokio::{ @@ -7,20 +61,13 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; -pub trait SubtaskRunUnit { - fn run(self) -> AbortHandle; -} - -pub trait SubtaskRunStream { - type Input; - fn run(self, input: BoxStream<'static, Self::Input>) -> AbortHandle; -} - +/// A trait whose implementation will be able send events pub trait SubtaskHandleUnit { type Output; fn handle(self, sender: UnboundedSender) -> AbortHandle; } +/// A trait whose implementation will be able to both send and receive events pub trait SubtaskHandleStream { type Input; type Output; @@ -31,12 +78,27 @@ pub trait SubtaskHandleStream { ) -> AbortHandle; } +/// A trait whose implementation can run a subtask that only ingests messages +pub trait SubtaskRunUnit { + fn run(self) -> AbortHandle; +} + +/// A trait whose implementation can run a subtask that can both ingest messages and emit them +pub trait SubtaskRunStream { + type Input; + fn run(self, input: BoxStream<'static, Self::Input>) -> AbortHandle; +} + +/// A background task that can emit messages via a sender channel pub struct Subtask { inner: T, sender: UnboundedSender, } impl Subtask { + /// Crates a new Subtask with unbounded channels for transmitting and receiving. The + /// transmitter is returned to the caller so that it can be used to send messages to the + /// Subtask's receiver pub fn new(inner: T) -> (UnboundedReceiverStream, Subtask) { let (tx, rx) = unbounded_channel(); ( @@ -47,6 +109,7 @@ impl Subtask { } impl, Output> SubtaskRunUnit for Subtask { + /// Begin running the subtask, calling handle() on the type implementing the SubTaskHandleUnit trait fn run(self) -> AbortHandle { self.inner.handle(self.sender) } @@ -54,6 +117,8 @@ impl, Output> SubtaskRunUnit for Subtask, Output> SubtaskRunStream for Subtask { type Input = T::Input; + + /// Begin running the subtask with a stream of events, calling handle() on the type implementing the SubTaskHandleStream trait fn run(self, input: BoxStream<'static, Self::Input>) -> AbortHandle { self.inner.handle(self.sender, input) }