Skip to content

Commit

Permalink
feat(rust): address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
etorreborre committed Feb 18, 2025
1 parent 21125ec commit e6282c7
Show file tree
Hide file tree
Showing 39 changed files with 84 additions and 82 deletions.
2 changes: 1 addition & 1 deletion examples/rust/rendezvous/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ockam_transport_udp::{UdpBindArguments, UdpBindOptions, UdpTransport, UDP};

#[ockam_macros::node]
async fn main(ctx: Context) -> Result<()> {
let udp = UdpTransport::create(&ctx)?;
let udp = UdpTransport::get_or_create(&ctx)?;

let bind = udp
.bind(UdpBindArguments::new(), UdpBindOptions::new())
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/rendezvous/src/bin/echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ockam_transport_udp::{UdpBindArguments, UdpBindOptions, UdpTransport};

#[ockam_macros::node]
async fn main(ctx: Context) -> Result<()> {
let udp = UdpTransport::create(&ctx)?;
let udp = UdpTransport::get_or_create(&ctx)?;
let bind = udp
.bind(
UdpBindArguments::new().with_bind_address("127.0.0.1:8000")?,
Expand Down
16 changes: 8 additions & 8 deletions implementations/rust/ockam/ockam/tests/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn test2(ctx: &mut Context) -> Result<()> {
.service_as_consumer(&tcp_listener_options.spawner_flow_control_id())
.relay_as_consumer(&tcp_listener_options.spawner_flow_control_id());
RelayService::create(ctx, "forwarding_service", options)?;
let cloud_tcp = TcpTransport::create(ctx)?;
let cloud_tcp = TcpTransport::get_or_create(ctx)?;

let cloud_listener = cloud_tcp
.listen("127.0.0.1:0", tcp_listener_options)
Expand All @@ -49,15 +49,15 @@ async fn test2(ctx: &mut Context) -> Result<()> {
ctx.flow_controls()
.add_consumer(&"echoer".into(), &tcp_options.flow_control_id());

let server_tcp = TcpTransport::create(ctx)?;
let server_tcp = TcpTransport::get_or_create(ctx)?;
let cloud_connection = server_tcp
.connect(cloud_listener.socket_string(), tcp_options)
.await?;

let remote_info =
RemoteRelay::create(ctx, cloud_connection.clone(), RemoteRelayOptions::new()).await?;

let client_tcp = TcpTransport::create(ctx)?;
let client_tcp = TcpTransport::get_or_create(ctx)?;
let cloud_connection = client_tcp
.connect(cloud_listener.socket_string(), TcpConnectionOptions::new())
.await?;
Expand All @@ -82,15 +82,15 @@ async fn test3(ctx: &mut Context) -> Result<()> {
.service_as_consumer(&tcp_listener_options.spawner_flow_control_id())
.relay_as_consumer(&tcp_listener_options.spawner_flow_control_id());
RelayService::create(ctx, "forwarding_service", options)?;
let cloud_tcp = TcpTransport::create(ctx)?;
let cloud_tcp = TcpTransport::get_or_create(ctx)?;
let cloud_listener = cloud_tcp
.listen("127.0.0.1:0", tcp_listener_options)
.await?;

let tcp_options = TcpConnectionOptions::new();
let server_tcp_flow_control_id = tcp_options.flow_control_id();

let server_tcp = TcpTransport::create(ctx)?;
let server_tcp = TcpTransport::get_or_create(ctx)?;
let cloud_connection = server_tcp
.connect(cloud_listener.socket_string(), tcp_options)
.await?;
Expand Down Expand Up @@ -171,7 +171,7 @@ async fn test4(ctx: &mut Context) -> Result<()> {
cloud_secure_channel_listener_options,
)?;

let cloud_tcp = TcpTransport::create(ctx)?;
let cloud_tcp = TcpTransport::get_or_create(ctx)?;
let cloud_listener = cloud_tcp
.listen("127.0.0.1:0", cloud_tcp_listener_options)
.await?;
Expand All @@ -187,7 +187,7 @@ async fn test4(ctx: &mut Context) -> Result<()> {
&server_secure_channel_listener_options.spawner_flow_control_id(),
);

let server_tcp = TcpTransport::create(ctx)?;
let server_tcp = TcpTransport::get_or_create(ctx)?;
let cloud_server_connection = server_tcp
.connect(cloud_listener.socket_string(), TcpConnectionOptions::new())
.await?;
Expand All @@ -211,7 +211,7 @@ async fn test4(ctx: &mut Context) -> Result<()> {
RemoteRelay::create(ctx, cloud_server_channel.clone(), RemoteRelayOptions::new()).await?;

// Client
let client_tcp = TcpTransport::create(ctx)?;
let client_tcp = TcpTransport::get_or_create(ctx)?;
let cloud_client_connection = client_tcp
.connect(cloud_listener.socket_string(), TcpConnectionOptions::new())
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Authority {
info!("started a secure channel listener with name '{listener_name}'");

// Create a TCP listener and wait for incoming connections
let tcp = TcpTransport::create(ctx)?;
let tcp = TcpTransport::get_or_create(ctx)?;

let listener = tcp
.listen(
Expand Down Expand Up @@ -562,7 +562,7 @@ pub mod tests {
caller: &Identifier,
) -> Result<AuthorityNodeClient> {
let client = NodeManager::authority_node_client(
TcpTransport::create(ctx)?,
TcpTransport::get_or_create(ctx)?,
secure_channels,
authority_identifier,
authority_route,
Expand Down
2 changes: 0 additions & 2 deletions implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ use crate::config::lookup::InternetAddress;

use crate::{fmt_warn, ConnectionStatus};

pub const DEFAULT_NODE_NAME: &str = "_default_node_name";

/// The methods below support the creation and update of local nodes
impl CliState {
/// Create a node, with some optional associated values, and start it
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli_state::DEFAULT_NODE_NAME;
use crate::cli_state::random_name;
use crate::config::UrlVar;
use crate::logs::default_values::*;
use crate::logs::env_variables::*;
Expand Down Expand Up @@ -513,15 +513,15 @@ async fn make_secure_client(
node
} else {
cli_state
.create_node_with_optional_identity(DEFAULT_NODE_NAME, &None)
.create_node_with_optional_identity(&random_name(), &None)
.await?
};
let secure_channels = cli_state.secure_channels(&default_node.name()).await?;

Ok(SecureClient::new(
secure_channels,
None,
TcpTransport::create(ctx)?,
TcpTransport::get_or_create(ctx)?,
project_route,
Arc::new(TrustIdentifierPolicy::new(identifier)),
&default_node.identifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl GrpcForwarder {

/// Forward an http Request.
/// We don't wait for a response here.
async fn forward_http_request(&mut self, request: http::Request<BoxBody>) -> Result<()> {
async fn forward_grpc_request(&mut self, request: http::Request<BoxBody>) -> Result<()> {
self.ready().await.map_err(ApiError::core)?;
let _ = self
.channel
Expand Down Expand Up @@ -75,7 +75,7 @@ impl Worker for GrpcForwarder {
let http_request = ockam_request.make_http_request().map_err(|e| {
ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}"))
})?;
self.forward_http_request(http_request)
self.forward_grpc_request(http_request)
.await
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Io, format!("{e:?}")))?;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ mod tests {
// wait until the forwarder node is up
tokio::time::sleep(Duration::from_millis(100)).await;
let secure_channels = create_secure_channels().await?;
let tcp_transport = TcpTransport::create(&ctx)?;
let tcp_transport = TcpTransport::get_or_create(&ctx)?;
let secure_client = make_secure_client(port, secure_channels, tcp_transport).await?;
let project_service =
SecureClientService::new(secure_client, &ctx, DefaultAddress::GRPC_FORWARDER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub(crate) mod tests {
// wait until the forwarder node is up
tokio::time::sleep(Duration::from_millis(100)).await;
let secure_channels = create_secure_channels().await?;
let tcp_transport = TcpTransport::create(&ctx)?;
let tcp_transport = TcpTransport::get_or_create(&ctx)?;
let secure_client = make_secure_client(port, secure_channels, tcp_transport).await?;
let project_service =
SecureClientService::new(secure_client, &ctx, DefaultAddress::GRPC_FORWARDER);
Expand Down Expand Up @@ -245,7 +245,7 @@ pub(crate) mod tests {
}

pub(crate) async fn start_tcp_listener(ctx: &Context, port: u16) -> Result<TcpListenerOptions> {
let tcp_transport = TcpTransport::create(ctx)?;
let tcp_transport = TcpTransport::get_or_create(ctx)?;
let tcp_listener_options = TcpListenerOptions::new();
let _tcp_listener = tcp_transport
.listen(format!("127.0.0.1:{port}"), tcp_listener_options.clone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tonic::codegen::Service;
/// SecureClient.
///
/// The Context is an Option since because it is necessary to clone this struct and attempting
/// to attempt a Context could fail if the current node is being shut down.
/// to clone a Context could fail if the current node is being shut down.
///
pub struct SecureClientService {
secure_client: SecureClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl BackgroundNodeClient {
cli_state: &CliState,
node_name: &str,
) -> miette::Result<BackgroundNodeClient> {
let tcp_transport = TcpTransport::create(ctx).into_diagnostic()?;
let tcp_transport = TcpTransport::get_or_create(ctx).into_diagnostic()?;
BackgroundNodeClient::new(&tcp_transport, cli_state, node_name)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl InMemoryNode {
) -> miette::Result<InMemoryNode> {
let defaults = NodeManagerDefaults::default();

let tcp = TcpTransport::create(ctx).into_diagnostic()?;
let tcp = TcpTransport::get_or_create(ctx).into_diagnostic()?;
let tcp_listener = tcp
.listen(
defaults.tcp_listener_address.as_str(),
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_api/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub async fn start_manager_for_tests(
bind_address: Option<&str>,
trust_options: Option<NodeManagerTrustOptions>,
) -> Result<NodeManagerHandle> {
let tcp = TcpTransport::create(context)?;
let tcp = TcpTransport::get_or_create(context)?;
let tcp_listener = tcp
.listen(
bind_address.unwrap_or("127.0.0.1:0"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub async fn start_authority(
.await?;

let authority_node_client = NodeManager::authority_node_client(
TcpTransport::create(ctx)?,
TcpTransport::get_or_create(ctx)?,
secure_channels.clone(),
&configuration.identifier,
&MultiAddr::try_from("/secure/api")?,
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_app_lib/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ pub(crate) async fn make_node_manager(
ctx: Arc<Context>,
cli_state: &CliState,
) -> miette::Result<Arc<InMemoryNode>> {
let tcp = TcpTransport::create(&ctx).into_diagnostic()?;
let tcp = TcpTransport::get_or_create(&ctx).into_diagnostic()?;
let options = TcpListenerOptions::new();
let listener = tcp
.listen(&"127.0.0.1:0", options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod config;
pub mod foreground;
pub mod node_callback;

const DEFAULT_NODE_NAME: &str = "_default_node_name";
const LONG_ABOUT: &str = include_str!("./static/create/long_about.txt");
const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::node::create::DEFAULT_NODE_NAME;
use crate::node::node_callback::NodeCallback;
use crate::node::CreateCommand;
use crate::service::config::ControlApiNodeResolution;
Expand All @@ -8,7 +9,7 @@ use miette::IntoDiagnostic;
use ockam::tcp::{TcpListenerOptions, TcpTransport};
use ockam::udp::{UdpBindArguments, UdpBindOptions, UdpTransport};
use ockam::{Address, Context};
use ockam_api::cli_state::{random_name, DEFAULT_NODE_NAME};
use ockam_api::cli_state::random_name;
use ockam_api::colors::color_primary;
use ockam_api::control_api::frontend::NodeResolution;
use ockam_api::fmt_log;
Expand Down Expand Up @@ -47,7 +48,7 @@ impl CreateCommand {
.into_diagnostic()?;

// Create TCP transport
let tcp = TcpTransport::create(ctx).into_diagnostic()?;
let tcp = TcpTransport::get_or_create(ctx).into_diagnostic()?;
let tcp_listener = tcp
.listen(&self.tcp_listener_address, TcpListenerOptions::new())
.await
Expand All @@ -74,7 +75,7 @@ impl CreateCommand {
debug!("node info persisted {node_info:?}");

let udp_options = if self.udp {
let udp = UdpTransport::create(ctx).into_diagnostic()?;
let udp = UdpTransport::get_or_create(ctx).into_diagnostic()?;
let options = UdpBindOptions::new();
let flow_control_id = options.flow_control_id();
udp.bind(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl CreateCommand {

RendezvousService::start(ctx, DefaultAddress::RENDEZVOUS_SERVICE).into_diagnostic()?;

let udp = UdpTransport::create(ctx).into_diagnostic()?;
let udp = UdpTransport::get_or_create(ctx).into_diagnostic()?;
let bind = udp
.bind(
UdpBindArguments::new().with_bind_socket_address(udp_address),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Command for GetMyAddressCommand {
const NAME: &'static str = "rendezvous get-my-address";

async fn run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> {
let udp = UdpTransport::create(ctx)?;
let udp = UdpTransport::get_or_create(ctx)?;
let bind = udp
.bind(
UdpBindArguments::new().with_bind_address("0.0.0.0:0")?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ mod common;
// Bob: TCP listener + Secure Channel listener
#[ockam_macros::test]
async fn test1(ctx: &mut Context) -> Result<()> {
let tcp_bob = TcpTransport::create(ctx)?;
let tcp_bob = TcpTransport::get_or_create(ctx)?;
let listener = tcp_bob
.listen("127.0.0.1:0", TcpListenerOptions::new())
.await?;

let tcp_alice = TcpTransport::create(ctx)?;
let tcp_alice = TcpTransport::get_or_create(ctx)?;
let connection_to_bob = tcp_alice
.connect(listener.socket_string(), TcpConnectionOptions::new())
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ async fn init(
ttl: Duration,
timing_options: RemoteCredentialRetrieverTimingOptions,
) -> Result<InitResult> {
let tcp = TcpTransport::create(ctx)?;
let tcp = TcpTransport::get_or_create(ctx)?;

let client_secure_channels = secure_channels().await?;
let authority_secure_channels = secure_channels().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn test_update_decryptor_route(ctx: &mut Context) -> Result<()> {

#[ockam_macros::test]
async fn test_update_decryptor_route_tcp(ctx: &mut Context) -> Result<()> {
let tcp = TcpTransport::create(ctx)?;
let tcp = TcpTransport::get_or_create(ctx)?;

let tcp_listener1 = tcp.listen("127.0.0.1:0", TcpListenerOptions::new()).await?;
let tcp_listener2 = tcp.listen("127.0.0.1:0", TcpListenerOptions::new()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl TcpTransport {
/// # use ockam_node::Context;
/// # use ockam_core::Result;
/// # async fn test(ctx: Context) -> Result<()> {
/// let tcp = TcpTransport::create(&ctx)?;
/// let tcp = TcpTransport::get_or_create(&ctx)?;
/// tcp.listen("127.0.0.1:8000", TcpListenerOptions::new()).await?; // Listen on port 8000
/// let connection = tcp.connect("127.0.0.1:5000", TcpConnectionOptions::new()).await?; // and connect to port 5000
/// # Ok(()) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ impl TcpTransport {
/// # use ockam_node::Context;
/// # use ockam_core::Result;
/// # async fn test(ctx: Context) -> Result<()> {
/// let tcp = TcpTransport::create(&ctx)?;
/// let tcp = TcpTransport::get_or_create(&ctx)?;
/// # Ok(()) }
/// ```
#[instrument(name = "create tcp transport", skip_all)]
pub fn create(ctx: &Context) -> Result<Arc<TcpTransport>> {
#[instrument(name = "get or create tcp transport", skip_all)]
pub fn get_or_create(ctx: &Context) -> Result<Arc<TcpTransport>> {
// don't register the TCP transport twice
match ctx.get_transport(TCP) {
Some(t) => {
Expand Down Expand Up @@ -166,7 +166,7 @@ mod tests {

#[ockam_macros::test]
async fn test_resolve_address(ctx: &mut Context) -> Result<()> {
let tcp = TcpTransport::create(ctx)?;
let tcp = TcpTransport::get_or_create(ctx)?;
let tcp_address = "127.0.0.1:0";
let initial_workers = ctx.list_workers()?;
let listener = TcpListener::bind(tcp_address)
Expand Down Expand Up @@ -205,7 +205,7 @@ mod tests {

#[ockam_macros::test]
async fn test_resolve_route_with_dns_address(ctx: &mut Context) -> Result<()> {
let tcp = TcpTransport::create(ctx)?;
let tcp = TcpTransport::get_or_create(ctx)?;
let tcp_address = "127.0.0.1:0";
let listener = TcpListener::bind(tcp_address)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl TcpTransport {
/// # use ockam_node::Context;
/// # use ockam_core::Result;
/// # async fn test(ctx: Context) -> Result<()> {
/// let tcp = TcpTransport::create(&ctx)?;
/// let tcp = TcpTransport::get_or_create(&ctx)?;
/// tcp.listen("127.0.0.1:8000", TcpListenerOptions::new()).await?;
/// # Ok(()) }
pub async fn listen(
Expand Down
Loading

0 comments on commit e6282c7

Please sign in to comment.