From 84c88c6540972e2df264548364601bdf2a02f335 Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Fri, 27 Sep 2024 12:44:42 +0100 Subject: [PATCH 1/2] document composition/runner --- src/composition/runner.rs | 60 +++++++++++++++++++++++++++-- src/composition/watchers/subtask.rs | 2 +- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/composition/runner.rs b/src/composition/runner.rs index eaaf2ed21..f1987b4ad 100644 --- a/src/composition/runner.rs +++ b/src/composition/runner.rs @@ -1,3 +1,29 @@ +//! A [`Runner`] provides methods for configuring and handling background tasks for producing +//! composition events based of supergraph config changes. +//! +//! ```rust,ignore +//! use apollo_federation_types::config::SupergraphConfig; +//! use tokio_stream::wrappers::UnboundedReceiverStream; +//! +//! use crate::composition::{ +//! events::CompositionEvent, +//! runner::Runner, +//! supergraph::binary::SupergraphBinary, +//! }; +//! +//! let supergraph_config = SupergraphConfig::new(); +//! let supergraph_binary = SupergraphBinary::new(); +//! +//! let runner = Runner::new(supergraph_config, supergraph_binary); +//! let stream = runner.run().await.unwrap(); +//! while let Some(event) = stream.next().await { +//! match event { +//! CompositionEvent::Started => println!("composition started"), +//! CompositionEvent::Success(_) => println!("composition success"), +//! CompositionEvent::Error(_) => println!("composition serror"), +//! } +//! } +//! ``` use std::collections::HashMap; use apollo_federation_types::config::SupergraphConfig; @@ -28,6 +54,8 @@ use super::{ }, }; +/// A struct for configuring and running subtasks for watching for both supergraph and subgraph +/// change events. // TODO: handle retry flag for subgraphs (see rover dev help) pub struct Runner { supergraph_config: FinalSupergraphConfig, @@ -45,7 +73,11 @@ impl Runner { } } + /// Start subtask watchers for both supergraph and subgraph configs, sending composition events on + /// the returned stream. pub async fn run(self) -> RoverResult> { + // Attempt to get a supergraph config stream and file based watcher subtask for receiving + // change events. let (supergraph_config_stream, supergraph_config_subtask) = match self.supergraph_config_subtask() { Some((supergraph_diff_stream, supergraph_config_subtask)) => ( @@ -55,20 +87,30 @@ impl Runner { None => (empty().boxed(), None), }; + // Construct watchers based on subgraph definitions in the given supergraph config. let subgraph_config_watchers = SubgraphWatchers::new(self.supergraph_config.clone().into()); + // Create a new subtask to handle events from the given subgraph watchers, receiving + // messages on the returned stream. let (subgraph_changed_messages, subgraph_config_watchers_subtask) = Subtask::new(subgraph_config_watchers); + // Create a handler for supergraph composition events. let composition_handler = RunComposition::builder() .supergraph_config(self.supergraph_config) .supergraph_binary(self.supergraph_binary) .exec_command(TokioCommand::default()) .read_file(FsReadFile::default()) .build(); - let (composition_messages, composition_subtask) = Subtask::new(composition_handler); + // Create a new subtask for the composition handler, passing in a stream of subgraph change + // events in order to trigger recomposition. + let (composition_messages, composition_subtask) = Subtask::new(composition_handler); composition_subtask.run(subgraph_changed_messages.boxed()); + + // Start subgraph watchers, listening for events from the supergraph change stream. subgraph_config_watchers_subtask.run(supergraph_config_stream); + + // Start the supergraph watcher subtask. if let Some(supergraph_config_subtask) = supergraph_config_subtask { supergraph_config_subtask.run(); } @@ -84,6 +126,10 @@ impl Runner { )> { let supergraph_config: SupergraphConfig = self.supergraph_config.clone().into(); + // If the supergraph config was passed as a file, we can configure a watcher for change + // events. + // We could return None here if we received a supergraph config directly from stdin. In + // that case, we don't want to configure a watcher. if let Some(origin_path) = self.supergraph_config.origin_path() { let f = FileWatcher::new(origin_path.clone()); let watcher = SupergraphConfigWatcher::new(f, supergraph_config.clone()); @@ -105,6 +151,7 @@ struct SubgraphWatchers { } impl SubgraphWatchers { + /// Create a set of watchers from the subgraph definitions of a supergraph config. pub fn new(supergraph_config: SupergraphConfig) -> SubgraphWatchers { let watchers = supergraph_config .into_iter() @@ -130,6 +177,11 @@ impl SubtaskHandleStream for SubgraphWatchers { ) -> AbortHandle { tokio::task::spawn(async move { let mut abort_handles: HashMap = HashMap::new(); + // Start a background task for each of the subtask watchers that listens for change + // events and send each event to the parent sender to be consumed by the composition + // handler. + // We also collect the abort handles for each background task in order to gracefully + // shut down. for (subgraph_name, (mut messages, subtask)) in self.watchers.into_iter() { let sender = sender.clone(); let messages_abort_handle = tokio::task::spawn(async move { @@ -144,9 +196,10 @@ impl SubtaskHandleStream for SubgraphWatchers { abort_handles.insert(subgraph_name, (messages_abort_handle, subtask_abort_handle)); } - // for supergraph diff events + // Wait for supergraph diff events received from the input stream. while let Some(diff) = input.next().await { - // for new subgraphs added to the session + // If we detect additional diffs, start a new subgraph subtask. + // Adding the abort handle to the currentl collection of handles. for (name, subgraph_config) in diff.added() { if let Ok((mut messages, subtask)) = SubgraphWatcher::try_from(subgraph_config.schema.clone()) @@ -174,6 +227,7 @@ impl SubtaskHandleStream for SubgraphWatchers { ); } } + // If we detect removal diffs, stop the subtask for the removed subgraph. for name in diff.removed() { if let Some((messages_abort_handle, subtask_abort_handle)) = abort_handles.get(name) diff --git a/src/composition/watchers/subtask.rs b/src/composition/watchers/subtask.rs index c44072387..8868807ad 100644 --- a/src/composition/watchers/subtask.rs +++ b/src/composition/watchers/subtask.rs @@ -61,7 +61,7 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; -/// A trait whose implementation will be able send events +/// A trait whose implementation will be able to send events pub trait SubtaskHandleUnit { type Output; fn handle(self, sender: UnboundedSender) -> AbortHandle; From 6200d16721386676c9fc0aa7f2b1e6b340b219ac Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Fri, 27 Sep 2024 19:21:52 +0100 Subject: [PATCH 2/2] add rust lints --- src/composition/runner.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/composition/runner.rs b/src/composition/runner.rs index f1987b4ad..d8a55e5cd 100644 --- a/src/composition/runner.rs +++ b/src/composition/runner.rs @@ -24,6 +24,7 @@ //! } //! } //! ``` +#![warn(missing_docs)] use std::collections::HashMap; use apollo_federation_types::config::SupergraphConfig; @@ -63,6 +64,7 @@ pub struct Runner { } impl Runner { + /// Produces a new Runner from a supergraph config and binary. pub fn new( supergraph_config: FinalSupergraphConfig, supergraph_binary: SupergraphBinary,