Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

document composition/runner #2181

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions src/composition/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
//! A [`Runner`] provides methods for configuring and handling background tasks for producing
loshz marked this conversation as resolved.
Show resolved Hide resolved
//! composition events based of supergraph config changes.
//!
//! ```rust,ignore
//! use apollo_federation_types::config::SupergraphConfig;
//! use tokio_stream::wrappers::UnboundedReceiverStream;
//!
//! use crate::composition::{
//! events::CompositionEvent,
//! runner::Runner,
//! supergraph::binary::SupergraphBinary,
//! };
//!
//! let supergraph_config = SupergraphConfig::new();
//! let supergraph_binary = SupergraphBinary::new();
//!
//! let runner = Runner::new(supergraph_config, supergraph_binary);
//! let stream = runner.run().await.unwrap();
//! while let Some(event) = stream.next().await {
//! match event {
//! CompositionEvent::Started => println!("composition started"),
//! CompositionEvent::Success(_) => println!("composition success"),
//! CompositionEvent::Error(_) => println!("composition serror"),
//! }
//! }
//! ```
#![warn(missing_docs)]
use std::collections::HashMap;

use apollo_federation_types::config::SupergraphConfig;
Expand Down Expand Up @@ -28,13 +55,16 @@ use super::{
},
};

/// A struct for configuring and running subtasks for watching for both supergraph and subgraph
/// change events.
// TODO: handle retry flag for subgraphs (see rover dev help)
pub struct Runner {
supergraph_config: FinalSupergraphConfig,
supergraph_binary: SupergraphBinary,
}

impl Runner {
/// Produces a new Runner from a supergraph config and binary.
pub fn new(
supergraph_config: FinalSupergraphConfig,
supergraph_binary: SupergraphBinary,
Expand All @@ -45,7 +75,11 @@ impl Runner {
}
}

/// Start subtask watchers for both supergraph and subgraph configs, sending composition events on
/// the returned stream.
pub async fn run(self) -> RoverResult<UnboundedReceiverStream<CompositionEvent>> {
// Attempt to get a supergraph config stream and file based watcher subtask for receiving
// change events.
let (supergraph_config_stream, supergraph_config_subtask) =
match self.supergraph_config_subtask() {
Some((supergraph_diff_stream, supergraph_config_subtask)) => (
Expand All @@ -55,20 +89,30 @@ impl Runner {
None => (empty().boxed(), None),
};

// Construct watchers based on subgraph definitions in the given supergraph config.
let subgraph_config_watchers = SubgraphWatchers::new(self.supergraph_config.clone().into());
// Create a new subtask to handle events from the given subgraph watchers, receiving
// messages on the returned stream.
let (subgraph_changed_messages, subgraph_config_watchers_subtask) =
Subtask::new(subgraph_config_watchers);

// Create a handler for supergraph composition events.
let composition_handler = RunComposition::builder()
.supergraph_config(self.supergraph_config)
.supergraph_binary(self.supergraph_binary)
.exec_command(TokioCommand::default())
.read_file(FsReadFile::default())
.build();
let (composition_messages, composition_subtask) = Subtask::new(composition_handler);

// Create a new subtask for the composition handler, passing in a stream of subgraph change
// events in order to trigger recomposition.
let (composition_messages, composition_subtask) = Subtask::new(composition_handler);
composition_subtask.run(subgraph_changed_messages.boxed());

// Start subgraph watchers, listening for events from the supergraph change stream.
subgraph_config_watchers_subtask.run(supergraph_config_stream);

// Start the supergraph watcher subtask.
if let Some(supergraph_config_subtask) = supergraph_config_subtask {
supergraph_config_subtask.run();
}
Expand All @@ -84,6 +128,10 @@ impl Runner {
)> {
let supergraph_config: SupergraphConfig = self.supergraph_config.clone().into();

// If the supergraph config was passed as a file, we can configure a watcher for change
// events.
// We could return None here if we received a supergraph config directly from stdin. In
// that case, we don't want to configure a watcher.
if let Some(origin_path) = self.supergraph_config.origin_path() {
let f = FileWatcher::new(origin_path.clone());
let watcher = SupergraphConfigWatcher::new(f, supergraph_config.clone());
Expand All @@ -105,6 +153,7 @@ struct SubgraphWatchers {
}

impl SubgraphWatchers {
/// Create a set of watchers from the subgraph definitions of a supergraph config.
pub fn new(supergraph_config: SupergraphConfig) -> SubgraphWatchers {
let watchers = supergraph_config
.into_iter()
Expand All @@ -130,6 +179,11 @@ impl SubtaskHandleStream for SubgraphWatchers {
) -> AbortHandle {
tokio::task::spawn(async move {
let mut abort_handles: HashMap<String, (AbortHandle, AbortHandle)> = HashMap::new();
// Start a background task for each of the subtask watchers that listens for change
// events and send each event to the parent sender to be consumed by the composition
// handler.
// We also collect the abort handles for each background task in order to gracefully
// shut down.
for (subgraph_name, (mut messages, subtask)) in self.watchers.into_iter() {
let sender = sender.clone();
let messages_abort_handle = tokio::task::spawn(async move {
Expand All @@ -144,9 +198,10 @@ impl SubtaskHandleStream for SubgraphWatchers {
abort_handles.insert(subgraph_name, (messages_abort_handle, subtask_abort_handle));
}

// for supergraph diff events
// Wait for supergraph diff events received from the input stream.
while let Some(diff) = input.next().await {
// for new subgraphs added to the session
// If we detect additional diffs, start a new subgraph subtask.
// Adding the abort handle to the currentl collection of handles.
for (name, subgraph_config) in diff.added() {
if let Ok((mut messages, subtask)) =
SubgraphWatcher::try_from(subgraph_config.schema.clone())
Expand Down Expand Up @@ -174,6 +229,7 @@ impl SubtaskHandleStream for SubgraphWatchers {
);
}
}
// If we detect removal diffs, stop the subtask for the removed subgraph.
for name in diff.removed() {
if let Some((messages_abort_handle, subtask_abort_handle)) =
abort_handles.get(name)
Expand Down
2 changes: 1 addition & 1 deletion src/composition/watchers/subtask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use tokio::{
};
use tokio_stream::wrappers::UnboundedReceiverStream;

/// A trait whose implementation will be able send events
/// A trait whose implementation will be able to send events
pub trait SubtaskHandleUnit {
type Output;
fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle;
Expand Down
Loading