Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire-in supergraph config resolver #2146

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/command/supergraph/compose/do_compose.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::env::current_dir;
use std::{fs::File, io::Write, process::Command, str};

use anyhow::{anyhow, Context};
Expand All @@ -13,10 +14,12 @@ use derive_getters::Getters;
use semver::Version;
use serde::Serialize;
use std::io::Read;
use tempfile::NamedTempFile;

use rover_client::shared::GraphRef;
use rover_client::RoverClientError;

use crate::composition::supergraph::config::SupergraphConfigResolver;
use crate::options::ProfileOpt;
use crate::utils::supergraph_config::get_supergraph_config;
use crate::utils::{client::StudioClientConfig, parsers::FileDescriptorType};
Expand Down Expand Up @@ -126,11 +129,41 @@ impl Compose {
) -> RoverResult<RoverOutput> {
#[cfg(debug_assertions)]
if self.opts.watch {
let mut runner = crate::composition::runner::Runner::new(&client_config, &self.opts);
let profile = ProfileOpt {
profile_name: "default".to_string(),
};
runner.run(&profile).await;
let supergraph_config_root = if let Some(FileDescriptorType::File(file_path)) =
&self.opts.supergraph_config_source.supergraph_yaml
{
file_path
.parent()
.ok_or_else(|| {
anyhow!("Could not get the parent directory of ({})", file_path)
})?
.to_path_buf()
} else {
Utf8PathBuf::try_from(current_dir()?)?
};
let studio_client = client_config.get_authenticated_client(&profile)?;
let internal_supergraph_config_path =
Utf8PathBuf::from_path_buf(NamedTempFile::new()?.into_temp_path().to_path_buf())
.map_err(|err| {
anyhow!("Unable to convert PathBuf ({:?}) to Utf8PathBuf", err)
})?;
let supergraph_config = SupergraphConfigResolver::new()
.load_from_file_descriptor(
self.opts.supergraph_config_source.supergraph_yaml.as_ref(),
)?
.load_remote_subgraphs(
&studio_client,
self.opts.supergraph_config_source.graph_ref.as_ref(),
)
.await?
.lazily_resolve_subgraphs(&supergraph_config_root)
.await?
.write(internal_supergraph_config_path)?;
let runner = crate::composition::runner::Runner::new(supergraph_config);
runner.run().await?;
return Ok(RoverOutput::EmptySuccess);
}
let mut supergraph_config = get_supergraph_config(
Expand Down
2 changes: 1 addition & 1 deletion src/command/supergraph/compose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) use no_compose::Compose;
mod do_compose;

#[cfg(feature = "composition-js")]
pub(crate) use do_compose::{Compose, SupergraphComposeOpts};
pub(crate) use do_compose::Compose;

use apollo_federation_types::build::BuildHint;

Expand Down
2 changes: 0 additions & 2 deletions src/composition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
watcher::{router_config::RouterConfigMessage, supergraph_config::SupergraphConfigDiff},
};

use self::supergraph::config::FinalSupergraphConfig;

pub mod events;
pub mod runner;
pub mod supergraph;
Expand Down Expand Up @@ -55,19 +53,19 @@
}

// NB: this is where we'll contain the logic for kicking off watchers
struct Composition {

Check warning on line 56 in src/composition/mod.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

struct `Composition` is never constructed
supergraph_binary: SupergraphBinary,
supergraph_config_events: Option<InputEvent>,
router_config_events: Option<InputEvent>,
}

enum InputEvent {

Check warning on line 62 in src/composition/mod.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

enum `InputEvent` is never used
SupergraphConfig(BoxStream<'static, SupergraphConfigDiff>),
RouterConfig(BoxStream<'static, RouterConfigMessage>),
}

impl Composition {
fn new(supergraph_binary: SupergraphBinary) -> Self {

Check warning on line 68 in src/composition/mod.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

associated items `new`, `with_supergraph_config_events`, `with_router_config_events`, and `watch` are never used
Self {
supergraph_binary,
supergraph_config_events: None,
Expand Down Expand Up @@ -107,7 +105,7 @@
}
}

struct WatchResultBetterName {

Check warning on line 108 in src/composition/mod.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

struct `WatchResultBetterName` is never constructed
abort_handle: AbortHandle,
composition_events: UnboundedReceiverStream<CompositionEvent>,
}
Expand Down Expand Up @@ -135,7 +133,7 @@
InputEvent::SupergraphConfig(mut events) => {
while let Some(event) = events.next().await {
sender.send(CompositionEvent::Started);
let current_supergraph_config = event.current();

Check warning on line 136 in src/composition/mod.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

unused variable: `current_supergraph_config`

// TODO: write current_supergraph_config to a path
match self.supergraph_binary.compose(&Utf8PathBuf::new()).await {
Expand Down
57 changes: 11 additions & 46 deletions src/composition/runner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use anyhow::anyhow;
use apollo_federation_types::config::SupergraphConfig;
use futures::stream::StreamExt;
use rover_std::errln;
use tokio::task::JoinHandle;

use crate::{
command::supergraph::compose::SupergraphComposeOpts,
composition::watchers::{
subtask::{Subtask, SubtaskRunUnit},
watcher::{
Expand All @@ -14,53 +12,38 @@ use crate::{
supergraph_config::SupergraphConfigWatcher,
},
},
options::ProfileOpt,
utils::{client::StudioClientConfig, supergraph_config::get_supergraph_config},
RoverError, RoverResult,
RoverResult,
};

use super::supergraph::config::FinalSupergraphConfig;

// TODO: handle retry flag for subgraphs (see rover dev help)
pub struct Runner {
client_config: StudioClientConfig,
supergraph_compose_opts: SupergraphComposeOpts,
supergraph_config: FinalSupergraphConfig,
}

impl Runner {
pub fn new(
client_config: &StudioClientConfig,
supergraph_compose_opts: &SupergraphComposeOpts,
) -> Self {
pub fn new(final_supergraph_config: FinalSupergraphConfig) -> Self {
Self {
client_config: client_config.clone(),
supergraph_compose_opts: supergraph_compose_opts.clone(),
supergraph_config: final_supergraph_config,
}
}

pub async fn run(&mut self, profile: &ProfileOpt) -> RoverResult<()> {
let supergraph_config = self.load_supergraph_config(profile).await?;

pub async fn run(&self) -> RoverResult<()> {
// Start supergraph and subgraph watchers.
let handles = self.start_config_watchers(supergraph_config.clone());
let handles = self.start_config_watchers();

futures::future::join_all(handles).await;

Ok(())
}

fn start_config_watchers(&self, supergraph_config: SupergraphConfig) -> Vec<JoinHandle<()>> {
fn start_config_watchers(&self) -> Vec<JoinHandle<()>> {
let supergraph_config: SupergraphConfig = self.supergraph_config.clone().into();
let mut futs = vec![];

// Create a new supergraph config file watcher.
let f = FileWatcher::new(
self.supergraph_compose_opts
.supergraph_config_source()
.supergraph_yaml()
.as_ref()
.unwrap()
.to_path_buf()
.unwrap()
.clone(),
);
let f = FileWatcher::new(self.supergraph_config.path().clone());
let watcher = SupergraphConfigWatcher::new(f, supergraph_config.clone());

// Create and run the file watcher in a sub task.
Expand Down Expand Up @@ -101,22 +84,4 @@ impl Runner {

futs
}

async fn load_supergraph_config(&self, profile: &ProfileOpt) -> RoverResult<SupergraphConfig> {
get_supergraph_config(
self.supergraph_compose_opts
.supergraph_config_source()
.graph_ref(),
self.supergraph_compose_opts
.supergraph_config_source()
.supergraph_yaml(),
self.supergraph_compose_opts.federation_version().as_ref(),
self.client_config.clone(),
profile,
false,
)
.await
.map_err(|err| RoverError::new(anyhow!("error loading supergraph config: {err}")))?
.ok_or_else(|| RoverError::new(anyhow!("Why is supergraph config None?")))
}
}
3 changes: 1 addition & 2 deletions src/composition/supergraph/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
};
use async_trait::async_trait;
use camino::Utf8PathBuf;
use derive_getters::Getters;
use tap::TapFallible;

use crate::{
composition::{CompositionError, CompositionSuccess},
utils::effect::{exec::ExecCommand, read_file::ReadFile},
};

use super::{config::FinalSupergraphConfig, version::SupergraphVersion};
use super::version::SupergraphVersion;

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum OutputTarget {
Expand Down Expand Up @@ -73,7 +72,7 @@

// TODO:
impl From<rover_std::RoverStdError> for CompositionError {
fn from(value: rover_std::RoverStdError) -> Self {

Check warning on line 75 in src/composition/supergraph/binary.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

unused variable: `value`
todo!()
}
}
Expand All @@ -85,7 +84,7 @@
}

impl SupergraphBinary {
fn new(exe: Utf8PathBuf, version: SupergraphVersion, output_target: OutputTarget) -> Self {

Check warning on line 87 in src/composition/supergraph/binary.rs

View workflow job for this annotation

GitHub Actions / Build Rover for macOS x86-64

associated function `new` is never used
Self {
exe,
version: version.clone(),
Expand Down
42 changes: 21 additions & 21 deletions src/composition/supergraph/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ mod state {

use state::LoadSupergraphConfig;

pub struct IntermediateSupergraphConfig<State> {
pub struct SupergraphConfigResolver<State> {
state: State,
}

impl<T> IntermediateSupergraphConfig<T> {
pub fn new() -> IntermediateSupergraphConfig<LoadSupergraphConfig> {
IntermediateSupergraphConfig {
impl SupergraphConfigResolver<LoadSupergraphConfig> {
pub fn new() -> SupergraphConfigResolver<LoadSupergraphConfig> {
SupergraphConfigResolver {
state: LoadSupergraphConfig,
}
}
Expand All @@ -61,11 +61,11 @@ pub enum LoadSupergraphConfigError {
ReadFileDescriptor(RoverError),
}

impl IntermediateSupergraphConfig<LoadSupergraphConfig> {
impl SupergraphConfigResolver<LoadSupergraphConfig> {
pub fn load_from_file_descriptor(
self,
file_descriptor_type: Option<FileDescriptorType>,
) -> Result<IntermediateSupergraphConfig<LoadRemoteSubgraphs>, LoadSupergraphConfigError> {
file_descriptor_type: Option<&FileDescriptorType>,
) -> Result<SupergraphConfigResolver<LoadRemoteSubgraphs>, LoadSupergraphConfigError> {
if let Some(file_descriptor_type) = file_descriptor_type {
let supergraph_config = file_descriptor_type
.read_file_descriptor("supergraph config", &mut std::io::stdin())
Expand All @@ -74,13 +74,13 @@ impl IntermediateSupergraphConfig<LoadSupergraphConfig> {
SupergraphConfig::new_from_yaml(&contents)
.map_err(LoadSupergraphConfigError::SupergraphConfig)
})?;
Ok(IntermediateSupergraphConfig {
Ok(SupergraphConfigResolver {
state: LoadRemoteSubgraphs {
supergraph_config: Some(supergraph_config),
},
})
} else {
Ok(IntermediateSupergraphConfig {
Ok(SupergraphConfigResolver {
state: LoadRemoteSubgraphs {
supergraph_config: None,
},
Expand All @@ -92,23 +92,23 @@ impl IntermediateSupergraphConfig<LoadSupergraphConfig> {
#[derive(thiserror::Error, Debug)]
pub enum LoadRemoteSubgraphsError {
#[error(transparent)]
FetchRemoteSubgraphsError(Box<dyn std::error::Error>),
FetchRemoteSubgraphsError(Box<dyn std::error::Error + Send + Sync>),
}

impl IntermediateSupergraphConfig<LoadRemoteSubgraphs> {
impl SupergraphConfigResolver<LoadRemoteSubgraphs> {
pub async fn load_remote_subgraphs(
self,
fetch_remote_subgraphs_impl: &impl FetchRemoteSubgraphs,
graph_ref: Option<&GraphRef>,
) -> Result<IntermediateSupergraphConfig<ResolveSubgraphs>, LoadRemoteSubgraphsError> {
) -> Result<SupergraphConfigResolver<ResolveSubgraphs>, LoadRemoteSubgraphsError> {
if let Some(graph_ref) = graph_ref {
let remote_supergraph_config = fetch_remote_subgraphs_impl
.fetch_remote_subgraphs(graph_ref)
.await
.map_err(|err| {
LoadRemoteSubgraphsError::FetchRemoteSubgraphsError(Box::new(err))
})?;
Ok(IntermediateSupergraphConfig {
Ok(SupergraphConfigResolver {
state: ResolveSubgraphs {
supergraph_config: self
.state
Expand All @@ -121,7 +121,7 @@ impl IntermediateSupergraphConfig<LoadRemoteSubgraphs> {
},
})
} else {
Ok(IntermediateSupergraphConfig {
Ok(SupergraphConfigResolver {
state: ResolveSubgraphs {
supergraph_config: self.state.supergraph_config,
},
Expand All @@ -138,13 +138,13 @@ pub enum ResolveSupergraphConfigError {
ResolveSubgraphs(Vec<ResolveSubgraphError>),
}

impl IntermediateSupergraphConfig<ResolveSubgraphs> {
impl SupergraphConfigResolver<ResolveSubgraphs> {
pub async fn fully_resolve_subgraphs<CTX>(
self,
introspect_subgraph_impl: &impl IntrospectSubgraph,
fetch_remote_subgraph_impl: &impl FetchRemoteSubgraph,
supergraph_config_root: &Utf8PathBuf,
) -> Result<IntermediateSupergraphConfig<Writing>, ResolveSupergraphConfigError>
) -> Result<SupergraphConfigResolver<Writing>, ResolveSupergraphConfigError>
where
CTX: IntrospectSubgraph + FetchRemoteSubgraph,
{
Expand All @@ -161,7 +161,7 @@ impl IntermediateSupergraphConfig<ResolveSubgraphs> {
)
.await
.map_err(ResolveSupergraphConfigError::ResolveSubgraphs)?;
Ok(IntermediateSupergraphConfig {
Ok(SupergraphConfigResolver {
state: Writing {
supergraph_config: resolved_supergraph_config.into(),
},
Expand All @@ -174,7 +174,7 @@ impl IntermediateSupergraphConfig<ResolveSubgraphs> {
pub async fn lazily_resolve_subgraphs(
self,
supergraph_config_root: &Utf8PathBuf,
) -> Result<IntermediateSupergraphConfig<Writing>, ResolveSupergraphConfigError> {
) -> Result<SupergraphConfigResolver<Writing>, ResolveSupergraphConfigError> {
match self.state.supergraph_config {
Some(supergraph_config) => {
let unresolved_supergraph_config =
Expand All @@ -186,7 +186,7 @@ impl IntermediateSupergraphConfig<ResolveSubgraphs> {
)
.await
.map_err(ResolveSupergraphConfigError::ResolveSubgraphs)?;
Ok(IntermediateSupergraphConfig {
Ok(SupergraphConfigResolver {
state: Writing {
supergraph_config: resolved_supergraph_config.into(),
},
Expand All @@ -205,7 +205,7 @@ pub enum WriteSupergraphConfigError {
Fs(RoverStdError),
}

impl IntermediateSupergraphConfig<Writing> {
impl SupergraphConfigResolver<Writing> {
pub fn write(
self,
path: Utf8PathBuf,
Expand All @@ -219,7 +219,7 @@ impl IntermediateSupergraphConfig<Writing> {
}
}

#[derive(Getters)]
#[derive(Clone, Debug, Getters)]
pub struct FinalSupergraphConfig {
path: Utf8PathBuf,
#[getter(skip)]
Expand Down
2 changes: 1 addition & 1 deletion src/composition/supergraph/config/remote_subgraphs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct MockFetchRemoteSubgraphsError {}
#[cfg_attr(test, mockall::automock(type Error = MockFetchRemoteSubgraphsError;))]
#[async_trait]
pub trait FetchRemoteSubgraphs {
type Error: std::error::Error + 'static;
type Error: std::error::Error + Send + Sync + 'static;
async fn fetch_remote_subgraphs(
&self,
graph_ref: &GraphRef,
Expand Down
8 changes: 4 additions & 4 deletions src/composition/supergraph/config/resolve/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ pub enum ResolveSubgraphError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Fs(Box<dyn std::error::Error>),
Fs(Box<dyn std::error::Error + Send + Sync>),
#[error("Failed to introspect the subgraph {name}.")]
IntrospectionError {
name: String,
error: Box<dyn std::error::Error>,
error: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Invalid graph ref: {graph_ref}")]
InvalidGraphRef {
graph_ref: String,
error: Box<dyn std::error::Error>,
error: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Failed to fetch the sdl for subgraph `{name}` from remote")]
FetchRemoteSdlError {
name: String,
error: Box<dyn std::error::Error>,
error: Box<dyn std::error::Error + Send + Sync>,
},
#[error(
"The subgraph `{name}` with graph ref `{graph_ref}` does not have an assigned routing url"
Expand Down
1 change: 0 additions & 1 deletion src/composition/watchers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod handler;
pub(crate) mod messages;
mod run;
pub(crate) mod subtask;
pub mod watcher;

Expand Down
Loading
Loading