Skip to content

Commit

Permalink
[net/client/redundant] Become generic over the connection type
Browse files Browse the repository at this point in the history
This removes a layer of vtables that may not be necessary in some
applications.  However, it complicates the typing -- we'll see whether
it's worthwhile.
  • Loading branch information
bal-e committed Dec 4, 2024
1 parent 880973e commit 4ffcbc5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 38 deletions.
10 changes: 6 additions & 4 deletions examples/client-transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,14 @@ async fn main() {
drop(request);

// Create a transport connection for redundant connections.
let redun = redundant::Connection::new();
let redun = redundant::Connection::<
Box<dyn SendRequest<RequestMessage<Vec<u8>>> + Send + Sync>,
>::new();

// Add the previously created transports.
redun.add(Box::new(udptcp_conn.clone()));
redun.add(Box::new(tcp_conn.clone()));
redun.add(Box::new(tls_conn.clone()));
redun.add(Box::new(udptcp_conn.clone()) as _);
redun.add(Box::new(tcp_conn.clone()) as _);
redun.add(Box::new(tls_conn.clone()) as _);

// Start a few queries.
for i in 1..10 {
Expand Down
10 changes: 6 additions & 4 deletions examples/query-routing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use domain::base::Name;
use domain::net::client::protocol::{TcpConnect, UdpConnect};
use domain::net::client::request::RequestMessage;
use domain::net::client::request::{RequestMessage, SendRequest};
use domain::net::client::{dgram_stream, redundant};
use domain::net::server::adapter::{
ClientTransportToSingleService, SingleServiceToService,
Expand Down Expand Up @@ -87,22 +87,24 @@ async fn main() {
async fn example_redundant(
dst1: &str,
dst2: &str,
) -> redundant::Connection<RequestMessage<Vec<u8>>> {
) -> redundant::Connection<
Box<dyn SendRequest<RequestMessage<Vec<u8>>> + Send + Sync>,
> {
let redun = redundant::Connection::new();
let server_addr = SocketAddr::new(IpAddr::from_str(dst1).unwrap(), 53);
let udp_connect = UdpConnect::new(server_addr);
let tcp_connect = TcpConnect::new(server_addr);
let (conn, transport) =
dgram_stream::Connection::new(udp_connect, tcp_connect);
tokio::spawn(transport.run());
redun.add(Box::new(conn));
redun.add(Box::new(conn) as _);
let server_addr = SocketAddr::new(IpAddr::from_str(dst2).unwrap(), 53);
let udp_connect = UdpConnect::new(server_addr);
let tcp_connect = TcpConnect::new(server_addr);
let (conn, transport) =
dgram_stream::Connection::new(udp_connect, tcp_connect);
tokio::spawn(transport.run());
redun.add(Box::new(conn));
redun.add(Box::new(conn) as _);

redun
}
75 changes: 51 additions & 24 deletions src/net/client/redundant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ impl Config {
//------------ Connection ----------------------------------------------------

/// A request multiplexer over redundant transports.
#[derive(Default, Debug)]
pub struct Connection<Req> {
pub struct Connection<Conn> {
/// Configuration for the transport.
config: Config,

Expand All @@ -114,10 +113,10 @@ pub struct Connection<Req> {
///
/// Within each transport, runtime statistics can be modified without
/// locking the entire list for mutation.
transports: RwLock<Vec<Arc<Transport<Req>>>>,
transports: RwLock<Vec<Arc<Transport<Conn>>>>,
}

impl<Req> Connection<Req> {
impl<Conn> Connection<Conn> {
/// Construct a new [`Connection`].
pub fn new() -> Self {
Self::with_config(Config::default())
Expand All @@ -132,7 +131,7 @@ impl<Req> Connection<Req> {
}

/// Add a new transport.
pub fn add(&self, transport: Box<dyn SendRequest<Req> + Send + Sync>) {
pub fn add(&self, transport: Conn) {
// Prepare the new transport.
let transport = Arc::new(Transport::new(transport));

Expand All @@ -141,8 +140,10 @@ impl<Req> Connection<Req> {
}
}

impl<Req: Clone + Send + Sync + 'static> SendRequest<Req>
for Connection<Req>
impl<Conn, Req> SendRequest<Req> for Connection<Conn>
where
Conn: SendRequest<Req> + Send + Sync + 'static,
Req: Clone + Send + Sync + 'static,
{
fn send_request(
&self,
Expand Down Expand Up @@ -188,17 +189,21 @@ impl<Req: Clone + Send + Sync + 'static> SendRequest<Req>
}
}

impl<Req: Clone> Connection<Req> {
impl<Conn> Connection<Conn> {
/// Multiplex a request through known transports.
///
/// The given list of transports will be queried, in order. If a request
/// does not finish within the associated timeout (which should be quite
/// rare), a request for the next transport is started concurrently.
async fn request(
async fn request<Req>(
config: Config,
transports: Vec<RequestTransport<Req>>,
transports: Vec<RequestTransport<Conn>>,
request: Req,
) -> Result<Message<Bytes>, Error> {
) -> Result<Message<Bytes>, Error>
where
Conn: SendRequest<Req>,
Req: Clone + Send + Sync + 'static,
{
// Ensure at least one transport is available.
if transports.is_empty() {
return Err(Error::NoTransportAvailable);
Expand Down Expand Up @@ -252,7 +257,7 @@ impl<Req: Clone> Connection<Req> {
/// The live set of transports will be snapshotted and sorted by timeout.
/// Occasionally, a slower transport may be assigned a low timeout so that
/// information about it can be updated.
fn prep_transports(&self) -> Vec<RequestTransport<Req>> {
fn prep_transports(&self) -> Vec<RequestTransport<Conn>> {
// Take a snapshot of the transport list.
let mut transports = self
.transports
Expand Down Expand Up @@ -289,33 +294,55 @@ impl<Req: Clone> Connection<Req> {
}
}

impl<Conn> Default for Connection<Conn> {
fn default() -> Self {
Self {
config: Default::default(),
transports: Default::default(),
}
}
}

impl<Conn> fmt::Debug for Connection<Conn> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("config", &self.config)
.field("transports", &self.transports)
.finish()
}
}

//------------ Transport -----------------------------------------------------

/// A transport known to [`Connection`].
struct Transport<Req> {
/// The underlying request sender.
sender: Box<dyn SendRequest<Req> + Send + Sync>,

struct Transport<Conn> {
/// Statistics about this transport.
///
/// This is updated after every request-response using this transport.
stats: Mutex<TransportStats>,

/// The underlying transport.
inner: Conn,
}

impl<Req> Transport<Req> {
impl<Conn> Transport<Conn> {
/// Construct a new [`Transport`].
pub fn new(sender: Box<dyn SendRequest<Req> + Send + Sync>) -> Self {
pub fn new(inner: Conn) -> Self {
Self {
sender,
stats: Default::default(),
inner,
}
}

/// Query this transport.
async fn request(
async fn request<Req>(
self: Arc<Self>,
request: Req,
) -> Result<Message<Bytes>, Error> {
) -> Result<Message<Bytes>, Error>
where
Conn: SendRequest<Req>,
Req: Clone + Send + Sync + 'static,
{
/// A drop guard for collecting statistics.
struct Guard<'a> {
/// Whether the request actually finished.
Expand Down Expand Up @@ -348,7 +375,7 @@ impl<Req> Transport<Req> {
};

// Perform the actual request.
let result = self.sender.send_request(request).get_response().await;
let result = self.inner.send_request(request).get_response().await;

// Inform the drop guard that the request completed.
guard.finished = true;
Expand All @@ -357,10 +384,10 @@ impl<Req> Transport<Req> {
}
}

impl<Req> fmt::Debug for Transport<Req> {
impl<Conn> fmt::Debug for Transport<Conn> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Transport")
.field("sender", &format_args!("_"))
.field("inner", &format_args!("_"))
.field("stats", &self.stats)
.finish()
}
Expand Down
18 changes: 12 additions & 6 deletions src/resolv/stub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,17 @@ pub mod conf;
/// [`run_with_conf`]: #method.run_with_conf
#[derive(Debug)]
pub struct StubResolver {
transport:
Mutex<Option<Arc<redundant::Connection<RequestMessage<Vec<u8>>>>>>,
transport: Mutex<Option<Arc<redundant::Connection<InnerTransport>>>>,

/// Resolver options.
options: ResolvOptions,

servers: Vec<ServerConf>,
}

type InnerTransport =
Box<dyn SendRequest<RequestMessage<Vec<u8>>> + Send + Sync>;

impl StubResolver {
/// Creates a new resolver using the system’s default configuration.
pub fn new() -> Self {
Expand Down Expand Up @@ -139,7 +141,7 @@ impl StubResolver {
CR: Clone + Debug + ComposeRequest + Send + Sync + 'static,
>(
&self,
) -> redundant::Connection<CR> {
) -> redundant::Connection<Box<dyn SendRequest<CR> + Send + Sync>> {
// Create a redundant transport and fill it with the right transports
let redun = redundant::Connection::new();

Expand All @@ -160,15 +162,15 @@ impl StubResolver {
multi_stream::Connection::new(TcpConnect::new(s.addr));
// Start the run function on a separate task.
fut_list_tcp.push(tran.run());
redun.add(Box::new(conn));
redun.add(Box::new(conn) as _);
} else {
let udp_connect = UdpConnect::new(s.addr);
let tcp_connect = TcpConnect::new(s.addr);
let (conn, tran) =
dgram_stream::Connection::new(udp_connect, tcp_connect);
// Start the run function on a separate task.
fut_list_udp_tcp.push(tran.run());
redun.add(Box::new(conn));
redun.add(Box::new(conn) as _);
}
}

Expand All @@ -181,7 +183,11 @@ impl StubResolver {

async fn get_transport(
&self,
) -> Arc<redundant::Connection<RequestMessage<Vec<u8>>>> {
) -> Arc<
redundant::Connection<
Box<dyn SendRequest<RequestMessage<Vec<u8>>> + Send + Sync>,
>,
> {
let mut opt_transport = self.transport.lock().await;

match &*opt_transport {
Expand Down

0 comments on commit 4ffcbc5

Please sign in to comment.