Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform the multi-stream connections handshake inside smoldot #2755

Merged
merged 3 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions bin/light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,10 @@ pub(super) async fn connection_task<TPlat: Platform>(
.pending_outcome_ok_single_stream(start_connect.id);
(id, either::Left((socket, task)))
}
PlatformConnection::MultiStream(socket, peer_id) => {
let (id, task) = guarded.network.pending_outcome_ok_multi_stream(
start_connect.id,
TPlat::now(),
&peer_id,
);

PlatformConnection::MultiStream(socket) => {
let (id, task) = guarded
.network
.pending_outcome_ok_multi_stream(start_connect.id);
(id, either::Right((socket, task)))
}
};
Expand Down
3 changes: 1 addition & 2 deletions bin/light-base/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use alloc::string::String;
use core::{ops, str, time::Duration};
use futures::prelude::*;
use smoldot::libp2p::peer_id::PeerId;

pub mod async_std;

Expand Down Expand Up @@ -136,7 +135,7 @@ pub enum PlatformConnection<TStream, TConnection> {
SingleStream(TStream),
/// The connection is made of multiple substreams. The encryption and multiplexing are handled
/// externally.
MultiStream(TConnection, PeerId),
MultiStream(TConnection),
}

/// Direction in which a substream has been opened. See [`Platform::next_substream`].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export interface ConnectionConfig {
*
* Must only be called once per connection.
*/
onOpen: (info: { type: 'single-stream' } | { type: 'multi-stream', peerId: Uint8Array }) => void;
onOpen: (info: { type: 'single-stream' } | { type: 'multi-stream' }) => void;

/**
* Callback called when the connection transitions to the `Closed` state.
Expand Down Expand Up @@ -328,9 +328,7 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports,
break
}
case 'multi-stream': {
const ptr = instance.exports.alloc(info.peerId.length) >>> 0;
new Uint8Array(instance.exports.memory.buffer).set(info.peerId, ptr);
instance.exports.connection_open_multi_stream(connectionId, ptr, info.peerId.length);
instance.exports.connection_open_multi_stream(connectionId);
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion bin/wasm-node/javascript/src/instance/bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export interface SmoldotWasmExports extends WebAssembly.Exports {
database_content: (chainId: number, maxSize: number) => void,
timer_finished: (timerId: number) => void,
connection_open_single_stream: (connectionId: number) => void,
connection_open_multi_stream: (connectionId: number, peerIdPtr: number, peerIdLen: number) => void,
connection_open_multi_stream: (connectionId: number) => void,
stream_message: (connectionId: number, streamId: number, ptr: number, len: number) => void,
connection_stream_opened: (connectionId: number, streamId: number, outbound: number) => void,
connection_closed: (connectionId: number, ptr: number, len: number) => void,
Expand Down
12 changes: 1 addition & 11 deletions bin/wasm-node/rust/src/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,24 +465,14 @@ pub extern "C" fn connection_open_single_stream(connection_id: u32) {
///
/// See also [`connection_new`].
///
/// The API user is responsible for determining the identity of the remote as part of the opening
/// process of the connection. This identity must then be provided in the form of the binary
/// representation of a peer ID, through `peer_id_ptr` and `peer_id_len`.
/// See <https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#peer-ids> for a
/// definition of the binary representation of a peer ID.
/// The buffer **must** have been allocated with [`alloc`]. It is freed when this function is
/// called.
///
/// When in the `Open` state, the connection can receive messages. When a message is received,
/// [`alloc`] must be called in order to allocate memory for this message, then
/// [`stream_message`] must be called with the pointer returned by [`alloc`].
#[no_mangle]
pub extern "C" fn connection_open_multi_stream(
connection_id: u32,
peer_id_ptr: u32,
peer_id_len: u32,
) {
crate::platform::connection_open_multi_stream(connection_id, peer_id_ptr, peer_id_len)
crate::platform::connection_open_multi_stream(connection_id)
}

/// Notify of a message being received on the stream. The connection associated with that stream
Expand Down
19 changes: 1 addition & 18 deletions bin/wasm-node/rust/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,12 @@ impl smoldot_light::platform::Platform for Platform {
))
}
ConnectionInner::MultiStream {
peer_id,
connection_handles_alive,
..
} => {
*connection_handles_alive += 1;
Ok(smoldot_light::platform::PlatformConnection::MultiStream(
ConnectionWrapper(connection_id),
peer_id.clone(),
))
}
ConnectionInner::Closed {
Expand Down Expand Up @@ -460,8 +458,6 @@ enum ConnectionInner {
NotOpen,
SingleStream,
MultiStream {
/// Peer id we're connected to.
peer_id: smoldot_light::PeerId,
/// List of substreams that the host (i.e. JavaScript side) has reported have been opened,
/// but that haven't been reported through
/// [`smoldot_light::platform::Platform::next_substream`] yet.
Expand Down Expand Up @@ -522,24 +518,11 @@ pub(crate) fn connection_open_single_stream(connection_id: u32) {
connection.something_happened.notify(usize::max_value());
}

pub(crate) fn connection_open_multi_stream(connection_id: u32, peer_id_ptr: u32, peer_id_len: u32) {
let peer_id = {
let peer_id_ptr = usize::try_from(peer_id_ptr).unwrap();
let peer_id_len = usize::try_from(peer_id_len).unwrap();
let bytes: Box<[u8]> = unsafe {
Box::from_raw(slice::from_raw_parts_mut(
peer_id_ptr as *mut u8,
peer_id_len,
))
};
smoldot_light::PeerId::from_bytes(bytes.into()).unwrap()
};

pub(crate) fn connection_open_multi_stream(connection_id: u32) {
let mut lock = STATE.try_lock().unwrap();
let connection = lock.connections.get_mut(&connection_id).unwrap();
debug_assert!(matches!(connection.inner, ConnectionInner::NotOpen));
connection.inner = ConnectionInner::MultiStream {
peer_id,
opened_substreams_to_pick_up: VecDeque::with_capacity(8),
connection_handles_alive: 0,
};
Expand Down
6 changes: 4 additions & 2 deletions src/libp2p/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,9 @@ where

/// Adds a new multi-stream connection to the collection.
///
/// Note that no [`Event::HandshakeFinished`] event will be generated. The connection is
/// immediately considered as fully established.
/// Must be passed the moment (as a `TNow`) when the connection as been established, in order
/// to determine when the handshake timeout expires.
// TODO: add an is_initiator parameter? right now we're always implicitly the initiator
melekes marked this conversation as resolved.
Show resolved Hide resolved
pub fn insert_multi_stream<TSubId>(
&mut self,
now: TNow,
Expand All @@ -394,6 +395,7 @@ where
self.randomness_seeds.gen(),
now,
self.max_inbound_substreams,
self.noise_key.clone(),
self.notification_protocols.clone(),
self.request_response_protocols.clone(),
self.ping_protocol.clone(),
Expand Down
Loading