Skip to content

Commit

Permalink
add: skip check operator in multiple-daemons
Browse files Browse the repository at this point in the history
  • Loading branch information
Gege-Wang committed Jun 10, 2024
1 parent 8a67bfd commit 513ac1b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
10 changes: 1 addition & 9 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) async fn spawn_dataflow(


for machine in &machines {
let working_dir = find_working_dir(daemon_connections, machine, working_dir.clone());
let working_dir = PathBuf::from(DEFAULT_WORKING_DIR);

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
Expand Down Expand Up @@ -68,14 +68,6 @@ pub(super) async fn spawn_dataflow(
})
}

fn find_working_dir(daemon_connections: &mut HashMap<String, DaemonConnection>, machine: &str, working_dir: PathBuf) -> PathBuf {
if daemon_connections.get_mut(machine).unwrap().listen_socket.ip().is_loopback() {
working_dir
} else {
PathBuf::from(DEFAULT_WORKING_DIR)
}

}

async fn spawn_dataflow_on_machine(
daemon_connections: &mut HashMap<String, DaemonConnection>,
Expand Down
39 changes: 33 additions & 6 deletions libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
use eyre::{bail, eyre, Context};
use std::{path::Path, process::Command};
use tracing::info;
use std::collections::HashSet;

use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");
Expand All @@ -18,6 +19,7 @@ pub fn check_dataflow(
) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut has_python_operator = false;
let is_multiple = nodes.iter().map(|n| &n.deploy.machine).collect::<HashSet<_>>().len() > 1;

// check that nodes and operators exist
for node in &nodes {
Expand All @@ -27,7 +29,7 @@ pub fn check_dataflow(
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if node.deploy.machine.is_empty() {
} else if !is_multiple {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;
Expand All @@ -48,27 +50,52 @@ pub fn check_dataflow(
OperatorSource::SharedLibrary(path) => {
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else {
} else if !is_multiple{
let path = adjust_shared_library_path(Path::new(&path))?;
if !working_dir.join(&path).exists() {
bail!("no shared library at `{}`", path.display());
}
}else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!("paths of operator must be absolute (operator `{}`)", operator_definition.id);
}
info!("skipping path check for remote operator `{}`", operator_definition.id);

}
}
OperatorSource::Python(python_source) => {
has_python_operator = true;
let path = &python_source.source;
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if !working_dir.join(path).exists() {
bail!("no Python library at `{path}`");
} else if !is_multiple {
if !working_dir.join(path).exists() {
bail!("no Python library at `{path}`");
}
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!("paths of python operator must be absolute (operator `{}`)", operator_definition.id);
}
info!("skipping path check for remote python operator `{}`", operator_definition.id);

}
}
OperatorSource::Wasm(path) => {
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if !working_dir.join(path).exists() {
bail!("no WASM library at `{path}`");
} else if !is_multiple {
if !working_dir.join(path).exists() {
bail!("no WASM library at `{path}`");
}
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!("paths of Wasm operator must be absolute (operator `{}`)", operator_definition.id);
}
info!("skipping path check for remote Wasm operator `{}`", operator_definition.id);

}
}
}
Expand Down

0 comments on commit 513ac1b

Please sign in to comment.