Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix distributed node #658

Merged
merged 8 commits into from
Sep 19, 2024
14 changes: 9 additions & 5 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,11 @@ enum Command {
#[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)]
local_listen_port: u16,
/// Address and port number of the dora coordinator
#[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))]
coordinator_addr: SocketAddr,
#[clap(long, short, default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)]
coordinator_port: u16,
#[clap(long, hide = true)]
run_dataflow: Option<PathBuf>,
/// Suppresses all log output to stdout.
Expand Down Expand Up @@ -469,6 +472,7 @@ fn run() -> eyre::Result<()> {
}
Command::Daemon {
coordinator_addr,
coordinator_port,
inter_daemon_addr,
local_listen_port,
machine_id,
Expand All @@ -483,7 +487,7 @@ fn run() -> eyre::Result<()> {
match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){
if coordinator_addr != LOCALHOST {
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
coordinator_addr
Expand All @@ -494,10 +498,10 @@ fn run() -> eyre::Result<()> {
handle_dataflow_result(result, None)
}
None => {
if coordinator_addr.ip() == LOCALHOST {
if coordinator_addr == LOCALHOST {
tracing::info!("Starting in local mode");
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await
Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await
}
}
})
Expand Down
7 changes: 7 additions & 0 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,13 @@ impl Daemon {
}
}

// Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
let working_dir = if working_dir.exists() {
working_dir
} else {
std::env::current_dir().wrap_err("failed to get current working dir")?
};

let result = self
.spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor)
.await;
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-daemons/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ async fn main() -> eyre::Result<()> {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
);
let (coordinator_port, coordinator) = dora_coordinator::start(
let (_coordinator_port, coordinator) = dora_coordinator::start(
coordinator_bind,
coordinator_control_bind,
ReceiverStream::new(coordinator_events_rx),
)
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let coordinator_addr = Ipv4Addr::LOCALHOST;
let daemon_a = run_daemon(coordinator_addr.to_string(), "A", 9843); // Random port
let daemon_b = run_daemon(coordinator_addr.to_string(), "B", 9842);

Expand Down
18 changes: 1 addition & 17 deletions libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
adjust_shared_library_path,
config::{DataId, Input, InputMapping, OperatorId, UserInputMapping},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource},
get_python_path,
};

Expand Down Expand Up @@ -34,23 +34,7 @@ pub fn check_dataflow(
if remote_daemon_id.contains(&node.deploy.machine.as_str())
|| coordinator_is_remote
{
let path = Path::new(&source);
let path = if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
if path.is_relative() {
eyre::bail!(
"paths of remote nodes must be absolute (node `{}`)",
node.id
);
}
info!("skipping path check for remote node `{}`", node.id);
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;
}
} else {
resolve_path(source, working_dir)
Expand Down
6 changes: 3 additions & 3 deletions libraries/core/src/topics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::net::{IpAddr, Ipv4Addr};

pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A;
pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B;
pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C;
pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 53290;
pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 53291;
pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 6012;

pub const MANUAL_STOP: &str = "dora/stop";
Loading