Skip to content

Commit

Permalink
ROVER-280 Attempt to properly propagate cancellation tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanrainer committed Jan 20, 2025
1 parent 6be7af9 commit 3145bae
Show file tree
Hide file tree
Showing 16 changed files with 365 additions and 291 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license-file = "./LICENSE"
name = "rover"
readme = "README.md"
repository = "https://github.com/apollographql/rover/"
version = "0.26.3"
version = "0.27.0-preview.1"
default-run = "rover"

publish = false
Expand Down
7 changes: 4 additions & 3 deletions crates/rover-std/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ impl Fs {
pub fn watch_file(
path: PathBuf,
tx: UnboundedSender<Result<(), RoverStdError>>,
cancellation_token: Option<CancellationToken>,
) -> CancellationToken {
let cancellation_token = CancellationToken::new();
let cancellation_token = cancellation_token.unwrap_or_default();

let poll_watcher = PollWatcher::new(
{
Expand Down Expand Up @@ -452,7 +453,7 @@ mod tests {
let path = file.path().to_path_buf();
let (tx, rx) = unbounded_channel();
let rx = Arc::new(Mutex::new(rx));
let cancellation_token = Fs::watch_file(path.clone(), tx);
let cancellation_token = Fs::watch_file(path.clone(), tx, None);

sleep(Duration::from_millis(1500)).await;

Expand Down Expand Up @@ -513,7 +514,7 @@ mod tests {
let (tx, rx) = unbounded_channel();
let rx = Arc::new(Mutex::new(rx));

let _cancellation_token = Fs::watch_file(path.clone(), tx);
let _cancellation_token = Fs::watch_file(path.clone(), tx, None);

sleep(Duration::from_millis(1500)).await;

Expand Down
5 changes: 2 additions & 3 deletions src/command/dev/legacy/router/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use std::{
use anyhow::{anyhow, Context};
use camino::Utf8PathBuf;
use crossbeam_channel::{unbounded, Receiver};
use serde_json::json;

use rover_std::{warnln, Fs};
use serde_json::json;

use crate::utils::expansion::expand;
use crate::{
Expand Down Expand Up @@ -269,7 +268,7 @@ impl RouterConfigReader {
let (raw_tx, mut raw_rx) = tokio::sync::mpsc::unbounded_channel();
let (state_tx, state_rx) = unbounded();
let input_config_path: PathBuf = input_config_path.as_path().into();
Fs::watch_file(input_config_path, raw_tx);
Fs::watch_file(input_config_path, raw_tx, None);
tokio::spawn(async move {
while let Some(res) = raw_rx.recv().await {
res.expect("could not watch router configuration file");
Expand Down
7 changes: 3 additions & 4 deletions src/command/dev/legacy/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ use anyhow::{anyhow, Context};
use apollo_federation_types::javascript::SubgraphDefinition;
use camino::{Utf8Path, Utf8PathBuf};
use reqwest::Client;
use tokio::time::MissedTickBehavior::Delay;
use url::Url;

use rover_client::blocking::StudioClient;
use rover_client::operations::subgraph::fetch;
use rover_client::operations::subgraph::fetch::SubgraphFetchInput;
use rover_client::shared::GraphRef;
use rover_std::{errln, Fs};
use tokio::time::MissedTickBehavior::Delay;
use url::Url;

use crate::{
command::dev::legacy::{
Expand Down Expand Up @@ -305,7 +304,7 @@ impl SubgraphSchemaWatcher {

let watch_path: PathBuf = path.as_path().into();

Fs::watch_file(watch_path, tx);
Fs::watch_file(watch_path, tx, None);

while let Some(res) = rx.recv().await {
match res {
Expand Down
9 changes: 5 additions & 4 deletions src/command/dev/next/router/watchers/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl FileWatcher {
let path = self.path;
let (file_tx, file_rx) = unbounded_channel();
let output = UnboundedReceiverStream::new(file_rx);
let cancellation_token = Fs::watch_file(path.as_path().into(), file_tx);
let cancellation_token = Fs::watch_file(path.as_path().into(), file_tx, None);

output
.filter_map(move |result| {
Expand Down Expand Up @@ -61,13 +61,14 @@ impl FileWatcher {

#[cfg(test)]
mod tests {
use futures::StreamExt;
use speculoos::assert_that;
use speculoos::option::OptionAssertions;
use std::fs::OpenOptions;
use std::io::Write;
use std::time::Duration;

use futures::StreamExt;
use speculoos::assert_that;
use speculoos::option::OptionAssertions;

use super::*;

#[tokio::test]
Expand Down
10 changes: 8 additions & 2 deletions src/composition/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
use camino::Utf8PathBuf;
use futures::stream::{select, BoxStream, StreamExt};
use rover_http::HttpService;
use tokio_util::sync::CancellationToken;
use tower::ServiceExt;

use self::state::SetupSubgraphWatchers;
Expand Down Expand Up @@ -220,7 +221,10 @@ where
// events in order to trigger recomposition.
let (composition_messages, composition_subtask) =
Subtask::new(self.state.composition_watcher);
composition_subtask.run(select(subgraph_change_stream, federation_watcher_stream).boxed());
composition_subtask.run(
select(subgraph_change_stream, federation_watcher_stream).boxed(),
None,
);

// Start subgraph watchers, listening for events from the supergraph change stream.
subgraph_watcher_subtask.run(
Expand All @@ -235,6 +239,7 @@ where
}
})
.boxed(),
Some(CancellationToken::new()),
);

federation_watcher_subtask.run(
Expand All @@ -249,11 +254,12 @@ where
}
})
.boxed(),
None,
);

// Start the supergraph watcher subtask.
if let Some(supergraph_config_subtask) = supergraph_config_subtask {
supergraph_config_subtask.run();
supergraph_config_subtask.run(None);
}

composition_messages.boxed()
Expand Down
71 changes: 37 additions & 34 deletions src/composition/watchers/composition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use camino::Utf8PathBuf;
use futures::stream::BoxStream;
use rover_std::{errln, infoln, warnln};
use tap::TapFallible;
use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::composition::supergraph::install::InstallSupergraph;
Expand Down Expand Up @@ -68,38 +69,39 @@ where
mut self,
sender: UnboundedSender<Self::Output>,
mut input: BoxStream<'static, Self::Input>,
) -> AbortHandle {
tokio::task::spawn({
cancellation_token: Option<CancellationToken>,
) {
tokio::task::spawn(async move {
let mut supergraph_config = self.supergraph_config.clone();
let target_file = self.temp_dir.join("supergraph.yaml");
async move {
if self.compose_on_initialisation {
if let Err(err) = self
.setup_temporary_supergraph_yaml(&supergraph_config, &target_file)
.await
{
error!("Could not setup initial supergraph schema: {}", err);
};
let _ = sender
.send(CompositionEvent::Started)
.tap_err(|err| error!("{:?}", err));
let output = self
.run_composition(&target_file, &self.output_target)
.await;
match output {
Ok(success) => {
let _ = sender
.send(CompositionEvent::Success(success))
.tap_err(|err| error!("{:?}", err));
}
Err(err) => {
let _ = sender
.send(CompositionEvent::Error(err))
.tap_err(|err| error!("{:?}", err));
}
if self.compose_on_initialisation {
if let Err(err) = self
.setup_temporary_supergraph_yaml(&supergraph_config, &target_file)
.await
{
error!("Could not setup initial supergraph schema: {}", err);
};
let _ = sender
.send(CompositionEvent::Started)
.tap_err(|err| error!("{:?}", err));
let output = self
.run_composition(&target_file, &self.output_target)
.await;
match output {
Ok(success) => {
let _ = sender
.send(CompositionEvent::Success(success))
.tap_err(|err| error!("{:?}", err));
}
Err(err) => {
let _ = sender
.send(CompositionEvent::Error(err))
.tap_err(|err| error!("{:?}", err));
}
}

}
let cancellation_token = cancellation_token.unwrap_or_default();
cancellation_token.run_until_cancelled(async {
while let Some(event) = input.next().await {
match event {
Subgraph(SubgraphEvent::SubgraphChanged(subgraph_schema_changed)) => {
Expand Down Expand Up @@ -193,9 +195,8 @@ where
}
}
}
}
})
.abort_handle()
}).await;
});
}
}

Expand Down Expand Up @@ -273,6 +274,7 @@ mod tests {
use rstest::rstest;
use semver::Version;
use speculoos::prelude::*;
use tokio_util::sync::CancellationToken;
use tracing_test::traced_test;

use super::{CompositionInputEvent, CompositionWatcher};
Expand Down Expand Up @@ -383,7 +385,8 @@ mod tests {
})
.boxed();
let (mut composition_messages, composition_subtask) = Subtask::new(composition_handler);
let abort_handle = composition_subtask.run(subgraph_change_events);
let cancellation_token = CancellationToken::new();
composition_subtask.run(subgraph_change_events, Some(cancellation_token.clone()));

// Assert we always get a subgraph added event.
let next_message = composition_messages.next().await;
Expand Down Expand Up @@ -420,7 +423,7 @@ mod tests {
));
}

abort_handle.abort();
cancellation_token.cancel().await;
Ok(())
}
}
62 changes: 36 additions & 26 deletions src/composition/watchers/federation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::stream::BoxStream;
use futures::StreamExt;
use tap::TapFallible;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::AbortHandle;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::composition::events::CompositionEvent;
Expand All @@ -25,32 +25,42 @@ impl SubtaskHandleStream for FederationWatcher {
self,
sender: UnboundedSender<Self::Output>,
mut input: BoxStream<'static, Self::Input>,
) -> AbortHandle {
tokio::task::spawn(async move {
while let Some(recv_res) = input.next().await {
match recv_res {
Ok(diff) => {
if let Some(fed_version) = diff.federation_version() {
let _ = sender
.send(CompositionInputEvent::Federation(
fed_version.clone().unwrap_or(LatestFedTwo),
))
.tap_err(|err| error!("{:?}", err));
cancellation_token: Option<CancellationToken>,
) {
let cancellation_token = cancellation_token.unwrap_or_default();
tokio::spawn(async move {
let cancellation_token = cancellation_token.clone();
cancellation_token
.run_until_cancelled(async move {
while let Some(recv_res) = input.next().await {
match recv_res {
Ok(diff) => {
if let Some(fed_version) = diff.federation_version() {
let _ = sender
.send(CompositionInputEvent::Federation(
fed_version.clone().unwrap_or(LatestFedTwo),
))
.tap_err(|err| error!("{:?}", err));
}
}
Err(SupergraphConfigSerialisationError::DeserializingConfigError {
source,
}) => {
let _ = sender
.send(CompositionInputEvent::Passthrough(
CompositionEvent::Error(
CompositionError::InvalidSupergraphConfig(
source.message(),
),
),
))
.tap_err(|err| error!("{:?}", err));
}
Err(_) => {}
}
}
Err(SupergraphConfigSerialisationError::DeserializingConfigError {
source,
}) => {
let _ = sender
.send(CompositionInputEvent::Passthrough(CompositionEvent::Error(
CompositionError::InvalidSupergraphConfig(source.message()),
)))
.tap_err(|err| error!("{:?}", err));
}
Err(_) => {}
}
}
})
.abort_handle()
})
.await;
});
}
}
Loading

0 comments on commit 3145bae

Please sign in to comment.