Skip to content

Commit

Permalink
Allow users to select the bind address (ip) to use with --bind (#2159)
Browse files Browse the repository at this point in the history
* Allow users to select the bind address (ip) to use with `--bind`

The default is still `0.0.0.0`

I'm no network expert, so some of these names could probably be improved

* Use the local_addr reported back to us

* Fix doc-tests

* docstring fix

* Replace the unspecified "0.0.0.0" ip with "localhost"

* Fix doctest
  • Loading branch information
emilk committed May 19, 2023
1 parent 09c255c commit 8d97093
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 55 deletions.
7 changes: 4 additions & 3 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ async fn listen_for_new_clients(
/// # use re_sdk_comms::{serve, ServerOptions};
/// #[tokio::main]
/// async fn main() {
/// let (sender, receiver) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve(80, ServerOptions::default(), receiver).await.unwrap();
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve("0.0.0.0", 80, ServerOptions::default(), shutdown_rx).await.unwrap();
/// }
/// ```
pub async fn serve(
bind_ip: &str,
port: u16,
options: ServerOptions,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<Receiver<LogMsg>> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

let bind_addr = format!("0.0.0.0:{port}");
let bind_addr = format!("{bind_ip}:{port}");
let listener = TcpListener::bind(&bind_addr).await.with_context(|| {
format!(
"Failed to bind TCP address {bind_addr:?}. Another Rerun instance is probably running."
Expand Down
7 changes: 6 additions & 1 deletion crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,12 @@ fn get_url(info: &eframe::IntegrationInfo) -> String {
url = param.clone();
}
if url.is_empty() {
re_ws_comms::server_url(&info.web_info.location.hostname, Default::default())
format!(
"{}://{}:{}",
re_ws_comms::PROTOCOL,
&info.web_info.location.hostname,
re_ws_comms::DEFAULT_WS_SERVER_PORT
)
} else {
url
}
Expand Down
47 changes: 33 additions & 14 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::{
fmt::Display,
net::SocketAddr,
str::FromStr,
task::{Context, Poll},
};
Expand Down Expand Up @@ -175,7 +176,7 @@ impl<T> Service<T> for MakeSvc {
pub struct WebViewerServerPort(pub u16);

impl WebViewerServerPort {
/// Port to use with [`WebViewerServer::port`] when you want the OS to pick a port for you.
/// Port to use with [`WebViewerServer::new`] when you want the OS to pick a port for you.
///
/// This is defined as `0`.
pub const AUTO: Self = Self(0);
Expand Down Expand Up @@ -222,7 +223,7 @@ impl WebViewerServer {
/// # async fn example() -> Result<(), WebViewerServerError> {
/// let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
/// let server = WebViewerServer::new("0.0.0.0", WebViewerServerPort::AUTO)?;
/// let port = server.port();
/// let server_url = server.server_url();
/// server.serve(shutdown_rx).await?;
/// # Ok(()) }
/// ```
Expand All @@ -247,22 +248,23 @@ impl WebViewerServer {
Ok(())
}

pub fn port(&self) -> WebViewerServerPort {
WebViewerServerPort(self.server.local_addr().port())
/// Includes `http://` prefix
pub fn server_url(&self) -> String {
server_url(&self.server.local_addr())
}
}

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
port: WebViewerServerPort,
local_addr: std::net::SocketAddr,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on port {}.", self.port);
re_log::info!("Shutting down web server on {}", self.server_url());
self.shutdown_tx.send(()).ok();
}
}
Expand All @@ -274,22 +276,39 @@ impl WebViewerServerHandle {
/// A port of 0 will let the OS choose a free port.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(requested_port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
pub fn new(
bind_ip: &str,
requested_port: WebViewerServerPort,
) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let web_server = WebViewerServer::new("0.0.0.0", requested_port)?;
let web_server = WebViewerServer::new(bind_ip, requested_port)?;

let port = web_server.port();
let local_addr = web_server.server.local_addr();

tokio::spawn(async move { web_server.serve(shutdown_rx).await });

re_log::info!("Started web server on port {}.", port);
let slf = Self {
local_addr,
shutdown_tx,
};

re_log::info!("Started web server on {}", slf.server_url());

Ok(Self { port, shutdown_tx })
Ok(slf)
}

/// Get the port where the HTTP server is listening
pub fn port(&self) -> WebViewerServerPort {
self.port
/// Includes `http://` prefix
pub fn server_url(&self) -> String {
server_url(&self.local_addr)
}
}

fn server_url(local_addr: &SocketAddr) -> String {
if local_addr.ip().is_unspecified() {
// "0.0.0.0"
format!("http://localhost:{}", local_addr.port())
} else {
format!("http://{local_addr}")
}
}
3 changes: 1 addition & 2 deletions crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ async fn main() {
)
.expect("Could not create web server");

let port = server.port();
let url = format!("http://{bind_ip}:{port}");
let url = server.server_url();
eprintln!("Hosting web-viewer on {url}");

if args.open {
Expand Down
10 changes: 8 additions & 2 deletions crates/re_ws_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,14 @@ impl FromStr for RerunServerPort {
}
}

pub fn server_url(hostname: &str, port: RerunServerPort) -> String {
format!("{PROTOCOL}://{hostname}:{port}")
/// Add a protocol (`ws://` or `wss://`) to the given address.
pub fn server_url(local_addr: &std::net::SocketAddr) -> String {
if local_addr.ip().is_unspecified() {
// "0.0.0.0"
format!("{PROTOCOL}://localhost:{}", local_addr.port())
} else {
format!("{PROTOCOL}://{local_addr}")
}
}

const PREFIX: [u8; 4] = *b"RR00";
Expand Down
42 changes: 24 additions & 18 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,33 @@ use crate::{server_url, RerunServerError, RerunServerPort};
/// Websocket host for relaying [`LogMsg`]s to a web viewer.
pub struct RerunServer {
listener: TcpListener,
port: RerunServerPort,
local_addr: std::net::SocketAddr,
}

impl RerunServer {
/// Create new [`RerunServer`] to relay [`LogMsg`]s to a websocket.
/// The websocket will be available at `port`.
///
/// A `bind_ip` of `"0.0.0.0"` is a good default.
/// A port of 0 will let the OS choose a free port.
pub async fn new(port: RerunServerPort) -> Result<Self, RerunServerError> {
let bind_addr = format!("0.0.0.0:{port}");
pub async fn new(bind_ip: String, port: RerunServerPort) -> Result<Self, RerunServerError> {
let bind_addr = format!("{bind_ip}:{port}");

let listener = TcpListener::bind(&bind_addr)
.await
.map_err(|err| RerunServerError::BindFailed(port, err))?;

let port = RerunServerPort(listener.local_addr()?.port());
let slf = Self {
local_addr: listener.local_addr()?,
listener,
};

re_log::info!(
"Listening for websocket traffic on {}. Connect with a Rerun Web Viewer.",
listener.local_addr()?
slf.server_url()
);

Ok(Self { listener, port })
Ok(slf)
}

/// Accept new connections until we get a message on `shutdown_rx`
Expand Down Expand Up @@ -74,22 +78,23 @@ impl RerunServer {
}
}

/// Contains the `ws://` or `wss://` prefix.
pub fn server_url(&self) -> String {
server_url("localhost", self.port)
server_url(&self.local_addr)
}
}

/// Sync handle for the [`RerunServer`]
///
/// When dropped, the server will be shut down.
pub struct RerunServerHandle {
port: RerunServerPort,
local_addr: std::net::SocketAddr,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for RerunServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down Rerun server on port {}.", self.port);
re_log::info!("Shutting down Rerun server on {}", self.server_url());
self.shutdown_tx.send(()).ok();
}
}
Expand All @@ -98,35 +103,36 @@ impl RerunServerHandle {
/// Create new [`RerunServer`] to relay [`LogMsg`]s to a websocket.
/// Returns a [`RerunServerHandle`] that will shutdown the server when dropped.
///
/// A `bind_ip` of `"0.0.0.0"` is a good default.
/// A port of 0 will let the OS choose a free port.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(
rerun_rx: Receiver<LogMsg>,
bind_ip: String,
requested_port: RerunServerPort,
) -> Result<Self, RerunServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let rt = tokio::runtime::Handle::current();

let ws_server = rt.block_on(tokio::spawn(async move {
RerunServer::new(requested_port).await
RerunServer::new(bind_ip, requested_port).await
}))??;

let port = ws_server.port;
let local_addr = ws_server.local_addr;

tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await });

Ok(Self { port, shutdown_tx })
}

/// Get the port where the websocket server is listening
pub fn port(&self) -> RerunServerPort {
self.port
Ok(Self {
local_addr,
shutdown_tx,
})
}

/// Contains the `ws://` or `wss://` prefix.
pub fn server_url(&self) -> String {
server_url("localhost", self.port)
server_url(&self.local_addr)
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub struct RerunArgs {
#[cfg(feature = "web_viewer")]
#[clap(long)]
serve: bool,

/// What bind address IP to use.
#[clap(long, default_value = "0.0.0.0")]
bind: String,
}

impl RerunArgs {
Expand Down Expand Up @@ -111,6 +115,7 @@ impl RerunArgs {
let open_browser = true;
crate::web_viewer::new_sink(
open_browser,
&self.bind,
WebViewerServerPort::default(),
RerunServerPort::default(),
)?
Expand Down
17 changes: 15 additions & 2 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ struct Args {
#[clap(long)]
web_viewer: bool,

/// What bind address IP to use.
#[clap(long, default_value = "0.0.0.0")]
bind: String,

/// What port do we listen to for hosting the web viewer over HTTP.
/// A port of 0 will pick a random port.
#[cfg(feature = "web_viewer")]
Expand Down Expand Up @@ -339,6 +343,7 @@ async fn run_impl(
#[cfg(feature = "web_viewer")]
{
let web_viewer = host_web_viewer(
args.bind.clone(),
args.web_viewer_port,
true,
rerun_server_ws_url,
Expand Down Expand Up @@ -411,7 +416,13 @@ async fn run_impl(
// `rerun.spawn()` doesn't need to log that a connection has been made
quiet: call_source.is_python(),
};
re_sdk_comms::serve(args.port, server_options, shutdown_rx.resubscribe()).await?
re_sdk_comms::serve(
&args.bind,
args.port,
server_options,
shutdown_rx.resubscribe(),
)
.await?
}

#[cfg(not(feature = "server"))]
Expand Down Expand Up @@ -443,12 +454,14 @@ async fn run_impl(
let shutdown_web_viewer = shutdown_rx.resubscribe();

// This is the server which the web viewer will talk to:
let ws_server = re_ws_comms::RerunServer::new(args.ws_server_port).await?;
let ws_server =
re_ws_comms::RerunServer::new(args.bind.clone(), args.ws_server_port).await?;
let ws_server_url = ws_server.server_url();
let ws_server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server));

// This is the server that serves the Wasm+HTML:
let web_server_handle = tokio::spawn(host_web_viewer(
args.bind.clone(),
args.web_viewer_port,
true,
ws_server_url,
Expand Down
Loading

0 comments on commit 8d97093

Please sign in to comment.