Skip to content

Commit

Permalink
fix: local daemon deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
Gege-Wang committed Jun 10, 2024
1 parent 513ac1b commit c501bd4
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 69 deletions.
12 changes: 0 additions & 12 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ async fn start_inner(
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, Result<(), String>>> = HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new();
let mut daemon_working_dirs: HashMap<String, PathBuf> = HashMap::new();

while let Some(event) = events.next().await {
if event.log() {
Expand Down Expand Up @@ -208,17 +207,6 @@ async fn start_inner(
"closing previous connection `{machine_id}` on new register"
);
}
if ip.is_loopback() {
daemon_working_dirs.insert(
machine_id.clone(),
PathBuf::from("/tmp/"), //TODO: Register Daemon working directory
);
} else {
daemon_working_dirs.insert(
machine_id.clone(),
PathBuf::from("/tmp/"), //TODO: Register Daemon working directory
);
}
}
(Err(err), _) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
Expand Down
36 changes: 20 additions & 16 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{tcp_utils::{tcp_receive, tcp_send}, DaemonConnection};
use crate::{
tcp_utils::{tcp_receive, tcp_send},
DaemonConnection,
};

use dora_core::{
daemon_messages::{
Expand Down Expand Up @@ -37,22 +40,24 @@ pub(super) async fn spawn_dataflow(
.map(|c| (m.clone(), c.listen_socket))
})
.collect::<Result<BTreeMap<_, _>, _>>()?;
let working_dir = if machines.len() > 1 {
PathBuf::from(DEFAULT_WORKING_DIR)
} else {
working_dir
};
let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir,
nodes: nodes.clone(),
machine_listen_ports: machine_listen_ports.clone(),
dataflow_descriptor: dataflow.clone(),
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;


for machine in &machines {
let working_dir = PathBuf::from(DEFAULT_WORKING_DIR);

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir,
nodes: nodes.clone(),
machine_listen_ports: machine_listen_ports.clone(),
dataflow_descriptor: dataflow.clone(),
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;
tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`");
spawn_dataflow_on_machine(daemon_connections, machine, &message)
.await
Expand All @@ -68,7 +73,6 @@ pub(super) async fn spawn_dataflow(
})
}


async fn spawn_dataflow_on_machine(
daemon_connections: &mut HashMap<String, DaemonConnection>,
machine: &str,
Expand Down
7 changes: 4 additions & 3 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use dora_core::{
config::{DataId, NodeRunConfig},
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE
resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition,
OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
Expand Down Expand Up @@ -88,9 +89,9 @@ pub async fn spawn_node(
target_path.clone()
} else {
let path = source_to_path(source);
if path.is_absolute(){
if path.is_absolute() {
path
} else{
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?
Expand Down
6 changes: 3 additions & 3 deletions examples/multiple-daemons/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ nodes:
machine: A
custom:
build: cargo build -p multiple-daemons-example-node
source: ../../target/debug/multiple-daemons-example-node
source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node
inputs:
tick: dora/timer/millis/10
outputs:
Expand All @@ -15,7 +15,7 @@ nodes:
operators:
- id: rust-operator
build: cargo build -p multiple-daemons-example-operator
shared-library: ../../target/debug/multiple_daemons_example_operator
shared-library: /home/runner/work/dora/dora/target/debug/multiple_daemons_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
Expand All @@ -26,6 +26,6 @@ nodes:
machine: B
custom:
build: cargo build -p multiple-daemons-example-sink
source: ../../target/debug/multiple-daemons-example-sink
source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-sink
inputs:
message: runtime-node/rust-operator/status
10 changes: 3 additions & 7 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,8 @@ impl Descriptor {
}

pub fn check(&self, working_dir: &Path) -> eyre::Result<()> {

validate::check_dataflow(self, working_dir)
.wrap_err("Dataflow could not be validated.")
validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.")
}

}

#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
Expand Down Expand Up @@ -434,12 +431,11 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
}
pub fn source_to_path(source: &str) -> PathBuf {
let path = Path::new(&source);
let path = if path.extension().is_none() {
if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
path
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
73 changes: 45 additions & 28 deletions libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ use crate::{
};

use eyre::{bail, eyre, Context};
use std::collections::HashSet;
use std::{path::Path, process::Command};
use tracing::info;
use std::collections::HashSet;

use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(
dataflow: &Descriptor,
working_dir: &Path,
) -> eyre::Result<()> {
pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut has_python_operator = false;
let is_multiple = nodes.iter().map(|n| &n.deploy.machine).collect::<HashSet<_>>().len() > 1;
let is_multiple = nodes
.iter()
.map(|n| &n.deploy.machine)
.collect::<HashSet<_>>()
.len()
> 1;

// check that nodes and operators exist
for node in &nodes {
Expand All @@ -30,18 +32,18 @@ pub fn check_dataflow(
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if !is_multiple {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;

} else {
let path = source_to_path(source);
if path.is_relative() {
eyre::bail!("paths of remote nodes must be absolute (node `{}`)", node.id);
}
info!("skipping path check for remote node `{}`", node.id);

resolve_path(source, working_dir)
.wrap_err_with(|| format!("Could not find source path `{}`", source))?;
} else {
let path = source_to_path(source);
if path.is_relative() {
eyre::bail!(
"paths of remote nodes must be absolute (node `{}`)",
node.id
);
}
info!("skipping path check for remote node `{}`", node.id);
}
}
},
descriptor::CoreNodeKind::Runtime(node) => {
Expand All @@ -50,18 +52,23 @@ pub fn check_dataflow(
OperatorSource::SharedLibrary(path) => {
if source_is_url(path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if !is_multiple{
} else if !is_multiple {
let path = adjust_shared_library_path(Path::new(&path))?;
if !working_dir.join(&path).exists() {
bail!("no shared library at `{}`", path.display());
}
}else {
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!("paths of operator must be absolute (operator `{}`)", operator_definition.id);
eyre::bail!(
"paths of operator must be absolute (operator `{}`)",
operator_definition.id
);
}
info!("skipping path check for remote operator `{}`", operator_definition.id);

info!(
"skipping path check for remote operator `{}`",
operator_definition.id
);
}
}
OperatorSource::Python(python_source) => {
Expand All @@ -76,10 +83,15 @@ pub fn check_dataflow(
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!("paths of python operator must be absolute (operator `{}`)", operator_definition.id);
eyre::bail!(
"paths of python operator must be absolute (operator `{}`)",
operator_definition.id
);
}
info!("skipping path check for remote python operator `{}`", operator_definition.id);

info!(
"skipping path check for remote python operator `{}`",
operator_definition.id
);
}
}
OperatorSource::Wasm(path) => {
Expand All @@ -92,10 +104,15 @@ pub fn check_dataflow(
} else {
let path = source_to_path(path);
if path.is_relative() {
eyre::bail!("paths of Wasm operator must be absolute (operator `{}`)", operator_definition.id);
eyre::bail!(
"paths of Wasm operator must be absolute (operator `{}`)",
operator_definition.id
);
}
info!("skipping path check for remote Wasm operator `{}`", operator_definition.id);

info!(
"skipping path check for remote Wasm operator `{}`",
operator_definition.id
);
}
}
}
Expand Down

0 comments on commit c501bd4

Please sign in to comment.