Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure remote working directory #545

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,9 @@ fn run() -> eyre::Result<()> {
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
if !coordinator_addr.is_loopback() {
dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?;
} else {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
}
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;

let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
Expand Down
1 change: 1 addition & 0 deletions binaries/coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ futures-concurrency = "7.1.0"
serde_json = "1.0.86"
names = "0.14.0"
ctrlc = "3.2.5"
dirs = "5.0.1"
1 change: 1 addition & 0 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ fn dataflow_result(
}
}

#[derive(Debug)]
struct DaemonConnection {
stream: TcpStream,
listen_socket: SocketAddr,
Expand Down
13 changes: 0 additions & 13 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,6 @@ pub(super) async fn spawn_dataflow(
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<SpawnedDataflow> {
let remote_machine_id: Vec<_> = daemon_connections
.iter()
.filter_map(|(id, c)| {
if !c.listen_socket.ip().is_loopback() {
Some(id.as_str())
} else {
None
}
})
.collect();
dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?;

let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

Expand All @@ -49,7 +37,6 @@ pub(super) async fn spawn_dataflow(
.map(|c| (m.clone(), c.listen_socket))
})
.collect::<Result<BTreeMap<_, _>, _>>()?;

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir,
Expand Down
1 change: 1 addition & 0 deletions binaries/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ aligned-vec = "0.5.0"
ctrlc = "3.2.5"
which = "5.0.0"
sysinfo = "0.30.11"
dirs = "5.0.1"
43 changes: 37 additions & 6 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,16 @@ impl Daemon {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
}
for (machine_id, socket) in machine_listen_ports {
for (machine_id, mut socket) in machine_listen_ports {
if socket.ip().is_loopback() {
if let Some(ref coordinator_socket) = self.coordinator_connection {
let new_ip = coordinator_socket
.peer_addr()
.wrap_err("failed to get peer address of coordinator")?
.ip();
socket = SocketAddr::new(new_ip, socket.port());
}
}
match self.inter_daemon_connections.entry(machine_id) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(InterDaemonConnection::new(socket));
Expand Down Expand Up @@ -564,15 +573,12 @@ impl Daemon {
) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone());
let dataflow = match self.running.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => {
self.working_dir.insert(dataflow_id, working_dir.clone());
entry.insert(dataflow)
}
std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow),
std::collections::hash_map::Entry::Occupied(_) => {
bail!("there is already a running dataflow with ID `{dataflow_id}`")
}
};

let mut working_dir = working_dir;
for node in nodes {
let local = node.deploy.machine == self.machine_id;

Expand Down Expand Up @@ -614,6 +620,31 @@ impl Daemon {
dataflow.pending_nodes.insert(node.id.clone());

let node_id = node.id.clone();
match &node.deploy.working_dir {
Some(local_working_dir) => {
working_dir = PathBuf::from(local_working_dir);
}
None => {
if !node.deploy.local {
working_dir = dirs::home_dir()
.wrap_err("failed to get home dir and change working dir")?;
}
tracing::debug!("As you don't specify working_dir in remote machine, change the home dir as working dir: {working_dir:?}");
}
}
match self.working_dir.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(working_dir.clone());
}
std::collections::hash_map::Entry::Occupied(entry) => {
tracing::info!(
"working_dir for dataflow {} in daemon {} already exists: {:?}",
dataflow_id,
node.deploy.machine,
entry.get()
);
}
}
match spawn::spawn_node(
dataflow_id,
&working_dir,
Expand Down
15 changes: 10 additions & 5 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use dora_core::{
config::DataId,
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE,
resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition,
OperatorSource, PythonSource, ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
Expand Down Expand Up @@ -103,9 +103,14 @@ pub async fn spawn_node(
.wrap_err("failed to download custom node")?;
target_path.clone()
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?
let path = source_to_path(source);
if path.is_absolute() {
path
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?
}
};

// If extension is .py, use python to run the script
Expand Down
9 changes: 7 additions & 2 deletions examples/multiple-daemons/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ nodes:
- id: rust-node
_unstable_deploy:
machine: A
local: true
custom:
build: cargo build -p multiple-daemons-example-node
source: ../../target/debug/multiple-daemons-example-node
Expand All @@ -11,7 +12,10 @@ nodes:
- random
- id: runtime-node
_unstable_deploy:
machine: A
machine: B
local: false
# This path is for CI, replace it with your remote daemon working_dir here
working_dir: /home/runner/work/dora/dora/examples/multiple-daemons
operators:
- id: rust-operator
build: cargo build -p multiple-daemons-example-operator
Expand All @@ -23,7 +27,8 @@ nodes:
- status
- id: rust-sink
_unstable_deploy:
machine: B
machine: A
local: true
custom:
build: cargo build -p multiple-daemons-example-sink
source: ../../target/debug/multiple-daemons-example-sink
Expand Down
43 changes: 19 additions & 24 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,30 +134,16 @@ impl Descriptor {
}

pub fn check(&self, working_dir: &Path) -> eyre::Result<()> {
validate::check_dataflow(self, working_dir, None, false)
.wrap_err("Dataflow could not be validated.")
}

pub fn check_in_daemon(
&self,
working_dir: &Path,
remote_machine_id: &[&str],
coordinator_is_remote: bool,
) -> eyre::Result<()> {
validate::check_dataflow(
self,
working_dir,
Some(remote_machine_id),
coordinator_is_remote,
)
.wrap_err("Dataflow could not be validated.")
validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.")
}
}

#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Deploy {
pub machine: Option<String>,
pub local: Option<bool>,
pub working_dir: Option<String>,
}

/// Dora Node
Expand Down Expand Up @@ -315,6 +301,8 @@ impl ResolvedNode {
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResolvedDeploy {
pub machine: String,
pub local: bool,
pub working_dir: Option<String>,
}
impl ResolvedDeploy {
fn new(deploy: Deploy, descriptor: &Descriptor) -> Self {
Expand All @@ -323,7 +311,11 @@ impl ResolvedDeploy {
Some(m) => m,
None => default_machine.to_owned(),
};
Self { machine }
Self {
machine,
local: deploy.local.unwrap_or(true),
working_dir: deploy.working_dir.clone(),
}
}
}

Expand Down Expand Up @@ -480,12 +472,7 @@ pub fn source_is_url(source: &str) -> bool {
}

pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
let path = Path::new(&source);
let path = if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
let path = source_to_path(source);

// Search path within current working directory
if let Ok(abs_path) = working_dir.join(&path).canonicalize() {
Expand All @@ -497,6 +484,14 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
bail!("Could not find source path {}", path.display())
}
}
pub fn source_to_path(source: &str) -> PathBuf {
let path = Path::new(&source);
if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
}
}

haixuanTao marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
Expand Down
Loading