Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subgraph watcher handle tests #2172

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions src/composition/watchers/watcher/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub enum SubgraphWatcherKind {
/// Don't ever update, schema is only pulled once.
// TODO: figure out what to do with this; is it ever used? can we remove it?
_Once(String),

/// Event specifically used for testing watch handlers.
#[cfg(test)]
TestWatcher,
}

impl TryFrom<SchemaSource> for SubgraphWatcher {
Expand All @@ -51,13 +55,9 @@ impl TryFrom<SchemaSource> for SubgraphWatcher {
// directives from introspection (but not when the source is a file)
fn try_from(schema_source: SchemaSource) -> Result<Self, Self::Error> {
match schema_source {
SchemaSource::File { file } => {
println!("wtf?");

Ok(Self {
watcher: SubgraphWatcherKind::File(FileWatcher::new(file)),
})
}
SchemaSource::File { file } => Ok(Self {
watcher: SubgraphWatcherKind::File(FileWatcher::new(file)),
}),
SchemaSource::SubgraphIntrospection {
subgraph_url,
introspection_headers,
Expand All @@ -74,16 +74,27 @@ impl TryFrom<SchemaSource> for SubgraphWatcher {
}

impl SubgraphWatcherKind {
/// Watch the subgraph for changes based on the kind of watcher attached
/// Watch a subgraph for changes based on the kind of watcher attached.
///
/// Development note: this is a stream of Strings, but in the future we might want something
/// more flexible to get type safety
/// more flexible to get type safety.
async fn watch(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
match self {
Self::File(file_watcher) => file_watcher.clone().watch(),
Self::Introspect(introspection) => introspection.watch(),
// TODO: figure out what this is; sdl? stdin one-off? either way, probs not watching
Self::_Once(_) => unimplemented!(),

// Create a new single buffered channel for testing watch events.
#[cfg(test)]
Self::TestWatcher => {
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;

let (tx, rx) = channel(1);
tx.send("watch event".to_string()).await.unwrap();
ReceiverStream::new(rx).boxed()
}
}
}
}
Expand Down Expand Up @@ -148,7 +159,8 @@ impl SubgraphIntrospection {
}
}

/// A unit struct denoting a change to a subgraph, used by composition to know whether to recompose
/// A unit struct denoting a change to a subgraph, used by composition to know whether to
/// recompose.
pub struct SubgraphChanged;

impl SubtaskHandleUnit for SubgraphWatcher {
Expand All @@ -157,7 +169,7 @@ impl SubtaskHandleUnit for SubgraphWatcher {
fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle {
tokio::spawn(async move {
let mut watcher = self.watcher.watch().await;
while let Some(_change) = watcher.next().await {
while watcher.next().await.is_some() {
let _ = sender
.send(SubgraphChanged)
.tap_err(|err| tracing::error!("{:?}", err));
Expand All @@ -166,3 +178,29 @@ impl SubtaskHandleUnit for SubgraphWatcher {
.abort_handle()
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;

use crate::composition::watchers::subtask::{Subtask, SubtaskRunUnit};

use super::{SubgraphChanged, SubgraphWatcher, SubgraphWatcherKind};

#[tokio::test]
async fn test_subgraphwatcher_handle() {
let watch_handler = SubgraphWatcher {
watcher: SubgraphWatcherKind::TestWatcher,
};

let (mut watch_messages, watch_subtask) = Subtask::new(watch_handler);
let abort_handle = watch_subtask.run();

assert!(matches!(
watch_messages.next().await.unwrap(),
SubgraphChanged
));

abort_handle.abort();
}
}
Loading