Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor WebRTC code #51

Merged
merged 10 commits into from
Apr 17, 2024
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ simple-dns = "0.5.3"
smallvec = "1.10.0"
snow = { version = "0.9.3", features = ["ring-resolver"], default-features = false }
socket2 = { version = "0.5.5", features = ["all"] }
str0m = "0.2.0"
str0m = "0.4.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
str0m = "0.4.1"
str0m = "0.5.0"

The str0m blocker has been fixed (algesten/str0m@7ed5143) and is included in the 0.5.0 release. I think we should be using the latest version since it includes a few other fixes https://github.com/algesten/str0m/blob/main/CHANGELOG.md#050

thiserror = "1.0.39"
tokio-stream = "0.1.12"
tokio-tungstenite = { version = "0.20.0", features = ["rustls-tls-native-roots"] }
Expand Down
180 changes: 159 additions & 21 deletions src/multistream_select/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::{
codec::unsigned_varint::UnsignedVarint,
error::{self, Error},
multistream_select::{
protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError},
protocol::{
encode_multistream_message, HeaderLine, Message, MessageIO, Protocol, ProtocolError,
},
Negotiated, NegotiationError, Version,
},
types::protocol::ProtocolName,
Expand Down Expand Up @@ -224,7 +226,7 @@ where
}

/// `multistream-select` handshake result for dialer.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum HandshakeResult {
/// Handshake is not complete, data missing.
NotReady,
Expand Down Expand Up @@ -259,7 +261,6 @@ pub struct DialerState {
state: HandshakeState,
}

// TODO: tests
impl DialerState {
/// Propose protocol to remote peer.
///
Expand All @@ -269,29 +270,22 @@ impl DialerState {
protocol: ProtocolName,
fallback_names: Vec<ProtocolName>,
) -> crate::Result<(Self, Vec<u8>)> {
// encode `/multistream-select/1.0.0` header
let mut bytes = BytesMut::with_capacity(64);
let message = Message::Header(HeaderLine::V1);
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData)?;
let mut header = UnsignedVarint::encode(bytes)?;

// encode proposed protocol
let mut proto_bytes = BytesMut::with_capacity(512);
let message = Message::Protocol(Protocol::try_from(protocol.as_bytes()).unwrap());
let _ = message.encode(&mut proto_bytes).map_err(|_| Error::InvalidData)?;
let proto_bytes = UnsignedVarint::encode(proto_bytes)?;

// TODO: add fallback names

header.append(&mut proto_bytes.into());
let message = encode_multistream_message(
std::iter::once(protocol.clone())
.chain(fallback_names.clone())
.filter_map(|protocol| Protocol::try_from(protocol.as_ref()).ok())
.map(|protocol| Message::Protocol(protocol)),
)?
Comment on lines +273 to +278
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably don't understand something, but as per multistream-select spec, protocols are tried one-by-one. Why are we encoding all protocols (including fallback protocols) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to check this but this works with smoldot, both for listener and dialer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. It looks like an incorrect way of handling the handshake and litep2p would reject this message as well but smoldot both sends and accepts Message::Protocols. I'm kind of tempted to keep it as is since it works but I'll check if smoldot would work with Message::Protocol

.freeze()
.to_vec();

Ok((
Self {
protocol,
fallback_names,
state: HandshakeState::WaitingResponse,
},
header,
message,
))
}

Expand Down Expand Up @@ -328,10 +322,9 @@ impl DialerState {
return Ok(HandshakeResult::Succeeded(self.protocol.clone()));
}

// TODO: zzz
for fallback in &self.fallback_names {
if fallback.as_bytes() == protocol.as_ref() {
return Ok(HandshakeResult::Succeeded(self.protocol.clone()));
return Ok(HandshakeResult::Succeeded(fallback.clone()));
}
}

Expand All @@ -346,3 +339,148 @@ impl DialerState {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn propose() {
let (mut dialer_state, message) =
DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();
let message = bytes::BytesMut::from(&message[..]).freeze();

let Message::Protocols(protocols) = Message::decode(message).unwrap() else {
panic!("invalid message type");
};

assert_eq!(protocols.len(), 2);
assert_eq!(
protocols[0],
Protocol::try_from(&b"/multistream/1.0.0"[..])
.expect("valid multitstream-select header")
);
assert_eq!(
protocols[1],
Protocol::try_from(&b"/13371338/proto/1"[..])
.expect("valid multitstream-select header")
);
}

#[test]
fn propose_with_fallback() {
let (mut dialer_state, message) = DialerState::propose(
ProtocolName::from("/13371338/proto/1"),
vec![ProtocolName::from("/sup/proto/1")],
)
.unwrap();
let message = bytes::BytesMut::from(&message[..]).freeze();

let Message::Protocols(protocols) = Message::decode(message).unwrap() else {
panic!("invalid message type");
};

assert_eq!(protocols.len(), 3);
assert_eq!(
protocols[0],
Protocol::try_from(&b"/multistream/1.0.0"[..])
.expect("valid multitstream-select header")
);
assert_eq!(
protocols[1],
Protocol::try_from(&b"/13371338/proto/1"[..])
.expect("valid multitstream-select header")
);
assert_eq!(
protocols[2],
Protocol::try_from(&b"/sup/proto/1"[..]).expect("valid multitstream-select header")
);
}

#[test]
fn register_response_invalid_message() {
// send only header line
let mut bytes = BytesMut::with_capacity(32);
let message = Message::Header(HeaderLine::V1);
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

let (mut dialer_state, _message) =
DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();

match dialer_state.register_response(bytes.freeze().to_vec()) {
Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError(
NegotiationError::Failed,
))) => {}
event => panic!("invalid event: {event:?}"),
}
}

#[test]
fn header_line_missing() {
// header line missing
let mut bytes = BytesMut::with_capacity(256);
let message = Message::Protocols(vec![
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
]);
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

let (mut dialer_state, _message) =
DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();

match dialer_state.register_response(bytes.freeze().to_vec()) {
Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError(
NegotiationError::Failed,
))) => {}
event => panic!("invalid event: {event:?}"),
}
}

#[test]
fn negotiate_main_protocol() {
let message = encode_multistream_message(
vec![Message::Protocol(
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
)]
.into_iter(),
)
.unwrap()
.freeze();

let (mut dialer_state, _message) = DialerState::propose(
ProtocolName::from("/13371338/proto/1"),
vec![ProtocolName::from("/sup/proto/1")],
)
.unwrap();

match dialer_state.register_response(message.to_vec()) {
Ok(HandshakeResult::Succeeded(negotiated)) =>
assert_eq!(negotiated, ProtocolName::from("/13371338/proto/1")),
_ => panic!("invalid event"),
}
}

#[test]
fn negotiate_fallback_protocol() {
let message = encode_multistream_message(
vec![Message::Protocol(
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
)]
.into_iter(),
)
.unwrap()
.freeze();

let (mut dialer_state, _message) = DialerState::propose(
ProtocolName::from("/13371338/proto/1"),
vec![ProtocolName::from("/sup/proto/1")],
)
.unwrap();

match dialer_state.register_response(message.to_vec()) {
Ok(HandshakeResult::Succeeded(negotiated)) =>
assert_eq!(negotiated, ProtocolName::from("/sup/proto/1")),
_ => panic!("invalid event"),
}
}
}
Loading
Loading