Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure remote working directory #545

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
path::PathBuf,
};
use uuid::{NoContext, Timestamp, Uuid};
const DEFAULT_WORKING_DIR: &str = "/tmp";
haixuanTao marked this conversation as resolved.
Show resolved Hide resolved

#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
Expand All @@ -39,7 +40,11 @@ pub(super) async fn spawn_dataflow(
.map(|c| (m.clone(), c.listen_socket))
})
.collect::<Result<BTreeMap<_, _>, _>>()?;

let working_dir = if machines.len() > 1 {
PathBuf::from(DEFAULT_WORKING_DIR)
} else {
working_dir
};
let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir,
Expand Down
15 changes: 10 additions & 5 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
config::{DataId, NodeRunConfig},
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, SHELL_SOURCE,
resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition,
OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
Expand All @@ -32,7 +32,7 @@
io::{AsyncBufReadExt, AsyncWriteExt},
sync::{mpsc, oneshot},
};
use tracing::{debug, error};

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `debug`

Check warning on line 35 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `debug`

/// clock is required for generating timestamps when dropping messages early because queue is full
pub async fn spawn_node(
Expand Down Expand Up @@ -88,9 +88,14 @@
.wrap_err("failed to download custom node")?;
target_path.clone()
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?
let path = source_to_path(source);
if path.is_absolute() {
path
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?
}
};

// If extension is .py, use python to run the script
Expand Down
6 changes: 3 additions & 3 deletions examples/multiple-daemons/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ nodes:
machine: A
custom:
build: cargo build -p multiple-daemons-example-node
source: ../../target/debug/multiple-daemons-example-node
source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node
haixuanTao marked this conversation as resolved.
Show resolved Hide resolved
inputs:
tick: dora/timer/millis/10
outputs:
Expand All @@ -15,7 +15,7 @@ nodes:
operators:
- id: rust-operator
build: cargo build -p multiple-daemons-example-operator
shared-library: ../../target/debug/multiple_daemons_example_operator
shared-library: /home/runner/work/dora/dora/target/debug/multiple_daemons_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
Expand All @@ -26,6 +26,6 @@ nodes:
machine: B
custom:
build: cargo build -p multiple-daemons-example-sink
source: ../../target/debug/multiple-daemons-example-sink
source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-sink
inputs:
message: runtime-node/rust-operator/status
15 changes: 9 additions & 6 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,7 @@ pub fn source_is_url(source: &str) -> bool {
}

pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
let path = Path::new(&source);
let path = if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
let path = source_to_path(source);

// Search path within current working directory
if let Ok(abs_path) = working_dir.join(&path).canonicalize() {
Expand All @@ -434,6 +429,14 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
bail!("Could not find source path {}", path.display())
}
}
pub fn source_to_path(source: &str) -> PathBuf {
let path = Path::new(&source);
if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
}
}

haixuanTao marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
Expand Down
74 changes: 65 additions & 9 deletions libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,44 @@ use crate::{
};

use eyre::{bail, eyre, Context};
use std::collections::HashSet;
use std::{path::Path, process::Command};
use tracing::info;

use super::{resolve_path, Descriptor, SHELL_SOURCE};
use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut has_python_operator = false;
let is_multiple = nodes
haixuanTao marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.map(|n| &n.deploy.machine)
.collect::<HashSet<_>>()
.len()
> 1;

// check that nodes and operators exist
for node in &nodes {
match &node.kind {
descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else {
} else if !is_multiple {
resolve_path(source, working_dir)
.wrap_err_with(|| format!("Could not find source path `{}`", source))?;
};
} else {
let path = source_to_path(source);
if path.is_relative() {
eyre::bail!(
"paths of remote nodes must be absolute (node `{}`)",
node.id
);
}
info!("skipping path check for remote node `{}`", node.id);
}
}
},
descriptor::CoreNodeKind::Runtime(node) => {
Expand All @@ -36,27 +52,67 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result
OperatorSource::SharedLibrary(path) => {
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else {
} else if !is_multiple {
let path = adjust_shared_library_path(Path::new(&path))?;
if !working_dir.join(&path).exists() {
bail!("no shared library at `{}`", path.display());
}
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!(
"paths of operator must be absolute (operator `{}`)",
operator_definition.id
);
}
info!(
"skipping path check for remote operator `{}`",
operator_definition.id
);
}
}
OperatorSource::Python(python_source) => {
has_python_operator = true;
let path = &python_source.source;
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if !working_dir.join(path).exists() {
bail!("no Python library at `{path}`");
} else if !is_multiple {
if !working_dir.join(path).exists() {
bail!("no Python library at `{path}`");
}
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!(
"paths of python operator must be absolute (operator `{}`)",
operator_definition.id
);
}
info!(
"skipping path check for remote python operator `{}`",
operator_definition.id
);
}
}
OperatorSource::Wasm(path) => {
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if !working_dir.join(path).exists() {
bail!("no WASM library at `{path}`");
} else if !is_multiple {
if !working_dir.join(path).exists() {
bail!("no WASM library at `{path}`");
}
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!(
"paths of Wasm operator must be absolute (operator `{}`)",
operator_definition.id
);
}
info!(
"skipping path check for remote Wasm operator `{}`",
operator_definition.id
);
}
}
}
Expand Down
Loading