Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ext/net): Add Conn.setNoDelay and Conn.setKeepAlive #13103

Merged
merged 16 commits into from
Jan 31, 2022
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions cli/tests/unit/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,78 @@ Deno.test({ permissions: { net: true } }, async function netTcpDialListen() {
conn.close();
});

Deno.test({ permissions: { net: true } }, async function netTcpSetNoDelay() {
const listener = Deno.listen({ port: 3500 });
listener.accept().then(
async (conn) => {
assert(conn.remoteAddr != null);
assert(conn.localAddr.transport === "tcp");
assertEquals(conn.localAddr.hostname, "127.0.0.1");
assertEquals(conn.localAddr.port, 3500);
await conn.write(new Uint8Array([1, 2, 3]));
conn.close();
},
);

const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
conn.setNoDelay(true);
assert(conn.remoteAddr.transport === "tcp");
assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
assertEquals(conn.remoteAddr.port, 3500);
assert(conn.localAddr != null);
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEquals(3, readResult);
assertEquals(1, buf[0]);
assertEquals(2, buf[1]);
assertEquals(3, buf[2]);
assert(conn.rid > 0);

assert(readResult !== null);
Copy link

@lucasfcosta lucasfcosta Dec 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that:

  1. L262 already checks whether readResult equals 3
  2. readResult is a const and assigned a primitive

Do we need this assertion or is it redundant?


const readResult2 = await conn.read(buf);
assertEquals(readResult2, null);

listener.close();
conn.close();
});

Deno.test({ permissions: { net: true } }, async function netTcpSetKeepAlive() {
const listener = Deno.listen({ port: 3500 });
listener.accept().then(
async (conn) => {
assert(conn.remoteAddr != null);
assert(conn.localAddr.transport === "tcp");
assertEquals(conn.localAddr.hostname, "127.0.0.1");
assertEquals(conn.localAddr.port, 3500);
await conn.write(new Uint8Array([1, 2, 3]));
conn.close();
},
);

const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
conn.setKeepAlive(true);
assert(conn.remoteAddr.transport === "tcp");
assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
assertEquals(conn.remoteAddr.port, 3500);
assert(conn.localAddr != null);
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEquals(3, readResult);
assertEquals(1, buf[0]);
assertEquals(2, buf[1]);
assertEquals(3, buf[2]);
assert(conn.rid > 0);

assert(readResult !== null);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, since readResult is a const to which a primitive gets assigned and whose value we assert upon on L298, isn't this assertion redundant?

Apologies if I'm missing anything here (this is also probably just a nitpick anyway, so I'm sorry).


const readResult2 = await conn.read(buf);
assertEquals(readResult2, null);

listener.close();
conn.close();
});

Deno.test(
{
ignore: Deno.build.os === "windows",
Expand Down
8 changes: 8 additions & 0 deletions ext/net/01_net.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@
closeWrite() {
return shutdown(this.rid);
}

setNoDelay(nodelay = true) {
return core.opSync("op_set_nodelay", this.rid, nodelay);
}

setKeepAlive(keepalive = true) {
return core.opSync("op_set_keepalive", this.rid, keepalive);
}
}

class Listener {
Expand Down
1 change: 1 addition & 0 deletions ext/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ deno_core = { version = "0.117.0", path = "../../core" }
deno_tls = { version = "0.22.0", path = "../tls" }
log = "0.4.14"
serde = { version = "1.0.129", features = ["derive"] }
socket2 = "0.4.2"
tokio = { version = "1.10.1", features = ["full"] }
trust-dns-proto = "0.20.3"
trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] }
30 changes: 30 additions & 0 deletions ext/net/io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
Expand All @@ -9,6 +10,7 @@ use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use socket2::SockRef;
use std::borrow::Cow;
use std::rc::Rc;
use tokio::io::AsyncRead;
Expand Down Expand Up @@ -118,6 +120,34 @@ impl Resource for TcpStreamResource {
}
}

impl TcpStreamResource {
pub fn set_nodelay(self: Rc<Self>, nodelay: bool) -> Result<(), AnyError> {
self.map_socket(Box::new(move |socket| Ok(socket.set_nodelay(nodelay)?)))
}

pub fn set_keepalive(
self: Rc<Self>,
keepalive: bool,
) -> Result<(), AnyError> {
self
.map_socket(Box::new(move |socket| Ok(socket.set_keepalive(keepalive)?)))
}

fn map_socket(
self: Rc<Self>,
map: Box<dyn FnOnce(SockRef) -> Result<(), AnyError>>,
) -> Result<(), AnyError> {
if let Some(wr) = RcRef::map(self, |r| &r.wr).try_borrow() {
let stream = wr.as_ref().as_ref();
let socket = socket2::SockRef::from(stream);

return map(socket);
}

Err(generic_error("Unable to get resources"))
}
}

#[cfg(unix)]
pub type UnixStreamResource =
FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>;
Expand Down
4 changes: 4 additions & 0 deletions ext/net/lib.deno_net.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ declare namespace Deno {
/** Shuts down (`shutdown(2)`) the write side of the connection. Most
* callers should just use `close()`. */
closeWrite(): Promise<void>;
/** Enable/disable the use of Nagle's algorithm. Defaults to true */
setNoDelay(nodelay?: boolean): void;
/** Enable/disable keep-alive functionality */
setKeepAlive(keepalive?: boolean): void;
}

// deno-lint-ignore no-empty-interface
Expand Down
124 changes: 124 additions & 0 deletions ext/net/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
("op_dgram_recv", op_async(op_dgram_recv)),
("op_dgram_send", op_async(op_dgram_send::<P>)),
("op_dns_resolve", op_async(op_dns_resolve::<P>)),
("op_set_nodelay", op_sync(op_set_nodelay::<P>)),
("op_set_keepalive", op_sync(op_set_keepalive::<P>)),
]
}

Expand Down Expand Up @@ -665,6 +667,26 @@ where
Ok(results)
}

pub fn op_set_nodelay<NP>(
state: &mut OpState,
rid: ResourceId,
nodelay: bool,
) -> Result<(), AnyError> {
let resource: Rc<TcpStreamResource> =
state.resource_table.get::<TcpStreamResource>(rid)?;
resource.set_nodelay(nodelay)
}

pub fn op_set_keepalive<NP>(
state: &mut OpState,
rid: ResourceId,
keepalive: bool,
) -> Result<(), AnyError> {
let resource: Rc<TcpStreamResource> =
state.resource_table.get::<TcpStreamResource>(rid)?;
resource.set_keepalive(keepalive)
}

fn rdata_to_return_record(
ty: RecordType,
) -> impl Fn(&RData) -> Option<DnsReturnRecord> {
Expand Down Expand Up @@ -717,8 +739,13 @@ fn rdata_to_return_record(
#[cfg(test)]
mod tests {
use super::*;
use deno_core::Extension;
use deno_core::JsRuntime;
use deno_core::RuntimeOptions;
use socket2::SockRef;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::path::Path;
use trust_dns_proto::rr::rdata::mx::MX;
use trust_dns_proto::rr::rdata::srv::SRV;
use trust_dns_proto::rr::rdata::txt::TXT;
Expand Down Expand Up @@ -810,4 +837,101 @@ mod tests {
]))
);
}

struct TestPermission {}

impl NetPermissions for TestPermission {
fn check_net<T: AsRef<str>>(
&mut self,
_host: &(T, Option<u16>),
) -> Result<(), AnyError> {
Ok(())
}

fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> {
Ok(())
}

fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> {
Ok(())
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn tcp_set_no_delay() {
let set_nodelay = Box::new(|state: &mut OpState, rid| {
op_set_nodelay::<TestPermission>(state, rid, true).unwrap();
});
let test_fn = Box::new(|socket: SockRef| {
assert!(socket.nodelay().unwrap());
assert!(!socket.keepalive().unwrap());
});
check_sockopt(String::from("127.0.0.1:4245"), set_nodelay, test_fn).await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn tcp_set_keepalive() {
let set_keepalive = Box::new(|state: &mut OpState, rid| {
op_set_keepalive::<TestPermission>(state, rid, true).unwrap();
});
let test_fn = Box::new(|socket: SockRef| {
assert!(!socket.nodelay().unwrap());
assert!(socket.keepalive().unwrap());
});
check_sockopt(String::from("127.0.0.1:4246"), set_keepalive, test_fn).await;
}

async fn check_sockopt(
addr: String,
set_sockopt_fn: Box<dyn Fn(&mut OpState, u32)>,
test_fn: Box<dyn FnOnce(SockRef)>,
) {
let clone_addr = addr.clone();
tokio::spawn(async move {
let listener = TcpListener::bind(addr).await.unwrap();
let _ = listener.accept().await;
});
let my_ext = Extension::builder()
.state(move |state| {
state.put(TestPermission {});
Ok(())
})
.build();

let mut runtime = JsRuntime::new(RuntimeOptions {
extensions: vec![my_ext],
..Default::default()
});

let conn_state = runtime.op_state();

let server_addr: Vec<&str> = clone_addr.split(':').collect();
let ip_args = IpListenArgs {
hostname: String::from(server_addr[0]),
port: server_addr[1].parse().unwrap(),
};
let connect_args = ConnectArgs {
transport: String::from("tcp"),
transport_args: ArgsEnum::Ip(ip_args),
};

let connect_fut =
op_net_connect::<TestPermission>(conn_state, connect_args, ());
let conn = connect_fut.await.unwrap();

let rid = conn.rid;
let state = runtime.op_state();
set_sockopt_fn(&mut state.borrow_mut(), rid);

let resource = state
.borrow_mut()
.resource_table
.get::<TcpStreamResource>(rid)
.unwrap();

let wr = resource.wr_borrow_mut().await;
let stream = wr.as_ref().as_ref();
let socket = socket2::SockRef::from(stream);
test_fn(socket);
}
}