Skip to content

Commit

Permalink
feat: add cloud workflow connectivity (#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgante authored May 14, 2024
1 parent a39358e commit 0c81511
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 12 deletions.
1 change: 1 addition & 0 deletions crates/auth/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::env;

use crate::{info::AuthInfo, testing::get_testing_auth_info};

pub static ENV_VAR_GRIT_LOCAL_SERVER: &str = "GRIT_LOCAL_SERVER";
pub static ENV_VAR_GRIT_AUTH_TOKEN: &str = "GRIT_AUTH_TOKEN";
pub static ENV_VAR_GRIT_API_URL: &str = "GRIT_API_URL";
pub static DEFAULT_GRIT_API_URL: &str = "https://api-gateway-prod-6et7uue.uc.gateway.dev";
Expand Down
8 changes: 7 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ default = [
"updater",
"workflows_v2",
"grit_tracing",
# Grit cloud feature for relaying workflow results
# "workflow_server",
# "remote_workflows",
# "server",
# "remote_redis",
Expand All @@ -102,6 +104,9 @@ remote_pubsub = []
remote_workflows = [
"dep:grit_cloud_client",
]
workflow_server = [
"dep:grit_cloud_client",
]
server = [
"workflows_v2",
"external_functions",
Expand Down Expand Up @@ -138,7 +143,8 @@ grit_beta = [
"updater",
"ai_builtins",
"workflows_v2",
"remote_workflows"
"remote_workflows",
"workflow_server"
# "grit_timing",
]
grit_timing = []
1 change: 0 additions & 1 deletion crates/cli/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ impl Updater {
pub async fn get_app_bin_and_install(&mut self, app: SupportedApp) -> Result<PathBuf> {
// If the path is overridden, skip checking install
if let Some(bin_path) = self.get_env_bin(&app)? {
info!("Using {} from: {}", app, bin_path.display());
return Ok(bin_path);
}
let bin_path = self.get_app_bin(&app)?;
Expand Down
33 changes: 28 additions & 5 deletions crates/cli/src/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{bail, Result};
use console::style;
use grit_util::FileRange;
use log::debug;
use marzano_auth::env::ENV_VAR_GRIT_AUTH_TOKEN;
use marzano_auth::env::{get_grit_api_url, ENV_VAR_GRIT_API_URL, ENV_VAR_GRIT_AUTH_TOKEN};
use marzano_gritmodule::{fetcher::LocalRepo, searcher::find_grit_dir_from};
use marzano_messenger::{emit::Messager, workflows::PackagedWorkflowOutcome};
use serde::Serialize;
Expand Down Expand Up @@ -53,7 +53,7 @@ pub async fn run_bin_workflow<M>(
mut arg: WorkflowInputs,
) -> Result<(M, PackagedWorkflowOutcome)>
where
M: Messager,
M: Messager + Send + 'static,
{
let cwd = std::env::current_dir()?;

Expand All @@ -64,6 +64,15 @@ where
let mut updater = Updater::from_current_bin().await?;
let repo = LocalRepo::from_dir(&cwd).await;

#[cfg(feature = "workflow_server")]
let (server_addr, handle, shutdown_tx) = {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let socket = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let server_addr = format!("http://{}", socket.local_addr()?);
let handle = grit_cloud_client::spawn_server_tasks(emitter, shutdown_rx, socket);
(server_addr, handle, shutdown_tx)
};

let root = std::env::var(ENV_GRIT_WORKSPACE_ROOT).unwrap_or_else(|_| {
repo.as_ref().and_then(|r| r.root().ok()).map_or_else(
|| cwd.to_string_lossy().into_owned(),
Expand Down Expand Up @@ -128,9 +137,16 @@ where
}
};

let mut child = Command::new(runner_path)
let mut child = Command::new(runner_path);
child
.arg(tempfile_path.to_string_lossy().to_string())
.env("GRIT_MARZANO_PATH", marzano_bin)
.env("GRIT_MARZANO_PATH", marzano_bin);

#[cfg(feature = "workflow_server")]
child.env(marzano_auth::env::ENV_VAR_GRIT_LOCAL_SERVER, &server_addr);

let mut final_child = child
.env(ENV_VAR_GRIT_API_URL, get_grit_api_url())
.env(ENV_VAR_GRIT_AUTH_TOKEN, grit_token)
.env(ENV_GRIT_WORKSPACE_ROOT, root)
.arg("--file")
Expand All @@ -139,7 +155,14 @@ where
.spawn()
.expect("Failed to start worker");

let status = child.wait().await?;
let status = final_child.wait().await?;

// Stop the embedded server
#[cfg(feature = "workflow_server")]
let emitter = {
shutdown_tx.send(()).unwrap();
handle.await?
};

// TODO: pass along outcome message
if status.success() {
Expand Down
21 changes: 20 additions & 1 deletion crates/core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,14 @@ pub struct InputFile {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "camelCase")]
pub struct Match {
#[serde(default)]
pub messages: Vec<Message>,
#[serde(default)]
pub variables: Vec<VariableMatch>,
pub source_file: String,
#[serde(default)]
pub ranges: Vec<Range>,
#[serde(default)]
pub debug: String,
}

Expand Down Expand Up @@ -330,7 +334,9 @@ impl FileMatchResult for Match {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "camelCase")]
pub struct EntireFile {
#[serde(default)]
pub messages: Vec<Message>,
#[serde(default)]
pub variables: Vec<VariableMatch>,
pub source_file: String,
pub content: String,
Expand All @@ -354,6 +360,8 @@ impl EntireFile {
pub struct Rewrite {
pub original: Match,
pub rewritten: EntireFile,
/// Deprecated
#[serde(default)]
pub ansi_summary: String,
pub reason: Option<RewriteReason>,
}
Expand Down Expand Up @@ -546,10 +554,21 @@ pub struct DoneFile {
pub has_results: Option<bool>,
#[serde(skip_serializing)]
pub file_hash: Option<[u8; 32]>,
#[serde(skip_serializing)]
#[serde(skip_serializing, skip_deserializing)]
pub from_cache: bool,
}

impl DoneFile {
pub fn new(relative_file_path: String) -> Self {
Self {
relative_file_path,
has_results: None,
file_hash: None,
from_cache: false,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "camelCase")]
pub struct Message {
Expand Down
3 changes: 3 additions & 0 deletions crates/grit-util/src/ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{ops::Add, path::PathBuf};
pub struct Range {
pub start: Position,
pub end: Position,
// TODO: automatically derive these from the start and end positions during deserialization
#[serde(skip_deserializing)]
pub start_byte: u32,
#[serde(skip_deserializing)]
pub end_byte: u32,
}

Expand Down
9 changes: 5 additions & 4 deletions crates/marzano_messenger/src/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,13 @@ pub trait Messager: Send + Sync {
}

/// Visibility levels dictate *which* objects we show (ex. just rewrites, or also every file analyzed)
#[derive(Debug, PartialEq, PartialOrd, Clone, Copy, ValueEnum, Serialize)]
#[derive(Debug, PartialEq, PartialOrd, Clone, Copy, ValueEnum, Serialize, Default)]
pub enum VisibilityLevels {
Primary = 3, // Always show this to users
Primary = 3, // Always show this to users
#[default]
Supplemental = 2, // Show to users as secondary information
Debug = 1, // Only show to users if they ask for it
Hidden = 0, // Never show to users
Debug = 1, // Only show to users if they ask for it
Hidden = 0, // Never show to users
}

impl std::fmt::Display for VisibilityLevels {
Expand Down
1 change: 1 addition & 0 deletions crates/marzano_messenger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod emit;
pub mod format;
pub mod output_mode;
pub mod testing;
pub mod workflows;
34 changes: 34 additions & 0 deletions crates/marzano_messenger/src/testing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use anyhow::Result;
use marzano_core::api::MatchResult;

use crate::emit::Messager;

/// A testing messenger that doesn't actually send messages anywhere.
///
/// This should be used in tests to avoid sending messages to real backends.
pub struct TestingMessenger {
message_count: usize,
}

impl TestingMessenger {
pub fn new() -> Self {
Self { message_count: 0 }
}

pub fn message_count(&self) -> usize {
self.message_count
}
}

impl Default for TestingMessenger {
fn default() -> Self {
Self::new()
}
}

impl Messager for TestingMessenger {
fn raw_emit(&mut self, _message: &MatchResult) -> Result<()> {
self.message_count += 1;
Ok(())
}
}

0 comments on commit 0c81511

Please sign in to comment.