Skip to content

Commit

Permalink
Tests are working
Browse files Browse the repository at this point in the history
  • Loading branch information
umgefahren committed Dec 12, 2023
1 parent fee9de1 commit faa694a
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 124 deletions.
3 changes: 2 additions & 1 deletion protocols/autonatv2/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod behaviour;
mod handler;

pub use behaviour::Behaviour;
pub use behaviour::{Behaviour, Report};
pub use handler::dial_request::StatusUpdate;
17 changes: 15 additions & 2 deletions protocols/autonatv2/src/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use rand_core::{OsRng, RngCore};

use crate::{global_only::IpExt, request_response::DialRequest};

use super::handler::{dial_back, dial_request, Handler, TestEnd};
use super::handler::{
dial_back,
dial_request::{self, StatusUpdate},
Handler, TestEnd,
};

struct IntervalTicker {
interval: Duration,
Expand Down Expand Up @@ -49,6 +53,12 @@ impl Default for Config {
}
}

#[derive(Debug)]
pub struct Report {
pub update: StatusUpdate,
pub peer_id: PeerId,
}

pub struct Behaviour<R = OsRng>
where
R: RngCore + 'static,
Expand Down Expand Up @@ -76,7 +86,7 @@ where
{
type ConnectionHandler = Handler;

type ToSwarm = ();
type ToSwarm = Report;

fn handle_established_inbound_connection(
&mut self,
Expand Down Expand Up @@ -209,6 +219,9 @@ where
Either::Left(dial_request::ToBehaviour::TestCompleted(Err(err))) => {
tracing::debug!("Test failed: {:?}", err);
}
Either::Left(dial_request::ToBehaviour::StatusUpdate(update)) => self
.pending_events
.push_back(ToSwarm::GenerateEvent(Report { update, peer_id })),
}
}

Expand Down
47 changes: 41 additions & 6 deletions protocols/autonatv2/src/client/handler/dial_request.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use futures::{channel::oneshot, AsyncRead, AsyncWrite, AsyncWriteExt};
use futures::{
channel::{mpsc, oneshot},
AsyncRead, AsyncWrite, AsyncWriteExt, SinkExt, StreamExt,

Check failure on line 3 in protocols/autonatv2/src/client/handler/dial_request.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-autonatv2

unused import: `AsyncWriteExt`
};
use futures_bounded::FuturesSet;
use libp2p_core::{
upgrade::{DeniedUpgrade, ReadyUpgrade},
Expand Down Expand Up @@ -58,7 +61,7 @@ pub enum Error {
#[error("server experienced failure during dial back on address: {addr:?}")]
FailureDuringDialBack { addr: Option<Multiaddr> },
#[error("error during substream upgrad")]
SubstreamError(
Substream(
#[from] StreamUpgradeError<<ReadyUpgrade<StreamProtocol> as OutboundUpgradeSend>::Error>,
),
}
Expand All @@ -73,9 +76,16 @@ pub struct TestEnd {
#[derive(Debug)]
pub enum ToBehaviour {
TestCompleted(Result<TestEnd, Error>),
StatusUpdate(StatusUpdate),
PeerHasServerSupport,
}

#[derive(Debug)]
pub enum StatusUpdate {
GotDialDataReq { addr: Multiaddr, num_bytes: usize },
CompletedDialData { addr: Multiaddr, num_bytes: usize },
}

#[derive(Debug)]
pub enum FromBehaviour {
PerformRequest(DialRequest),
Expand All @@ -98,14 +108,19 @@ pub struct Handler {
>,
>,
>,
status_update_rx: mpsc::Receiver<StatusUpdate>,
status_update_tx: mpsc::Sender<StatusUpdate>,
}

impl Handler {
pub(crate) fn new() -> Self {
let (status_update_tx, status_update_rx) = mpsc::channel(10);
Self {
queued_events: VecDeque::new(),
outbound: FuturesSet::new(DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS),
queued_streams: VecDeque::default(),
status_update_tx,
status_update_rx,
}
}

Expand All @@ -118,7 +133,11 @@ impl Handler {
});
if self
.outbound
.try_push(start_substream_handle(req, rx))
.try_push(start_substream_handle(
req,
rx,
self.status_update_tx.clone(),
))
.is_err()
{
tracing::debug!("Dial request dropped, too many requests in flight");
Expand Down Expand Up @@ -157,6 +176,11 @@ impl ConnectionHandler for Handler {
ToBehaviour::TestCompleted(m.map_err(Error::Timeout).and_then(identity)),
));
}
if let Poll::Ready(Some(status_update)) = self.status_update_rx.poll_next_unpin(cx) {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::StatusUpdate(status_update),
));
}
Poll::Pending
}

Expand Down Expand Up @@ -226,9 +250,10 @@ async fn start_substream_handle(
StreamUpgradeError<<ReadyUpgrade<StreamProtocol> as OutboundUpgradeSend>::Error>,
>,
>,
status_update_tx: mpsc::Sender<StatusUpdate>,
) -> Result<TestEnd, Error> {
match substream_recv.await {
Ok(Ok(substream)) => handle_substream(dial_request, substream).await,
Ok(Ok(substream)) => handle_substream(dial_request, substream, status_update_tx).await,
Ok(Err(err)) => Err(Error::from(err)),
Err(_) => Err(Error::InternalServer),
}
Expand All @@ -237,6 +262,7 @@ async fn start_substream_handle(
async fn handle_substream(
dial_request: DialRequest,
substream: impl AsyncRead + AsyncWrite + Unpin,
mut status_update_tx: mpsc::Sender<StatusUpdate>,
) -> Result<TestEnd, Error> {
let mut coder = Coder::new(substream);
coder
Expand Down Expand Up @@ -267,20 +293,29 @@ async fn handle_substream(
min: DATA_LEN_LOWER_BOUND,
});
}
match dial_request.addrs.get(addr_idx) {
let addr = match dial_request.addrs.get(addr_idx).cloned() {
Some(addr) => {
tracing::trace!("the address {addr} is suspicious to the server, sending {num_bytes} bytes of data");
suspicious_addr.push(addr.clone());
addr
}
None => {
return Err(Error::InvalidReferencedAddress {
index: addr_idx,
max: dial_request.addrs.len(),
});
}
}
};

let _ = status_update_tx
.send(StatusUpdate::GotDialDataReq {
addr: addr.clone(),
num_bytes,
})
.await;
let status_update = StatusUpdate::CompletedDialData { addr, num_bytes };
send_aap_data(&mut coder, num_bytes).await?;
let _ = status_update_tx.send(status_update).await;
}
Response::Dial(dial_response) => {
coder.close().await?;
Expand Down
15 changes: 0 additions & 15 deletions protocols/autonatv2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,3 @@ pub(crate) const DIAL_BACK_UPGRADE: ReadyUpgrade<StreamProtocol> =
ReadyUpgrade::new(DIAL_BACK_PROTOCOL_NAME);

pub type Nonce = u64;

pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}
50 changes: 26 additions & 24 deletions protocols/autonatv2/src/server/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,19 @@ where
..
}) => {
for (addr, _) in pairs.iter() {
if let Some((_, p)) = self.dialing_dial_back.keys().find(|(a, _)| a == addr) {
let cmds = self.dialing_dial_back.remove(&(addr.clone(), *p)).unwrap();
for cmd in cmds {
let _ = cmd.back_channel.send(DialBack::Dial);
let cleaned_addr: Multiaddr = addr
.iter()
.filter(|p| !matches!(p, Protocol::P2p(_)))
.collect();
let peer_id_opt = addr.iter().find_map(|p| match p {
Protocol::P2p(peer) => Some(peer),
_ => None,
});
if let Some(peer_id) = peer_id_opt {
if let Some(cmd) = self.dialing_dial_back.remove(&(cleaned_addr, peer_id)) {
for cmd in cmd {
let _ = cmd.back_channel.send(DialBack::Dial);
}
}
}
}
Expand All @@ -167,27 +176,20 @@ where
handler: NotifyHandler::One(*connection_id),
event: Either::Left(cmd),
});
} else if let Some(pending) =
self.dialing_dial_back.get_mut(&(addr.clone(), peer_id))
{
pending.push_back(cmd);
} else {
if let Some(pending) =
self.dialing_dial_back.get_mut(&(addr.clone(), peer_id))
{
pending.push_back(cmd);
} else {
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id)
.addresses(Vec::from([addr.clone()]))
.condition(PeerCondition::Always)
.allocate_new_port()
.build(),
/* opts: DialOpts::unknown_peer_id()
.address(addr.clone())
.allocate_new_port()
.build(),
*/
});
self.dialing_dial_back
.insert((addr, peer_id), VecDeque::from([cmd]));
}
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id)
.addresses(Vec::from([addr.clone()]))
.condition(PeerCondition::Always)
.allocate_new_port()
.build(),
});
self.dialing_dial_back
.insert((addr, peer_id), VecDeque::from([cmd]));
}
}
Err(e) => {
Expand Down
Loading

0 comments on commit faa694a

Please sign in to comment.