Skip to content

Commit

Permalink
net: channel hosts a pointer to Session
Browse files Browse the repository at this point in the history
solved a bug which warned Session could not be made into an object.

documented here: rust-lang/rust#51443
and here: https://stackoverflow.com/questions/72838225/rust-trait-warning-method-references-the-self-type-in-its-where-clause
  • Loading branch information
lunar-mining committed Jul 2, 2022
1 parent 7e540d0 commit af2ffd4
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 36 deletions.
9 changes: 5 additions & 4 deletions src/net/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ pub type AcceptorPtr = Arc<Acceptor>;
pub struct Acceptor {
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
task: StoppableTaskPtr,
//pub session: Mutex<Option<SessionWeakPtr>>,
pub session: Mutex<Option<SessionWeakPtr>>,
}

impl Acceptor {
/// Create new Acceptor object.
pub fn new() -> Arc<Self> {
pub fn new(session: Mutex<Option<SessionWeakPtr>>) -> Arc<Self> {
Arc::new(Self {
channel_subscriber: Subscriber::new(),
task: StoppableTask::new(),
//session,
session,
})
}
/// Start accepting inbound socket connections. Creates a listener to start
Expand Down Expand Up @@ -148,7 +148,8 @@ impl Acceptor {
loop {
match listener.next().await {
Ok((stream, url)) => {
let channel = Channel::new(stream, url).await;
let channel =
Channel::new(stream, url, self.session.lock().await.clone().unwrap()).await;
self.channel_subscriber.notify(Ok(channel)).await;
}
Err(e) => {
Expand Down
6 changes: 3 additions & 3 deletions src/net/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct Channel {
receive_task: StoppableTaskPtr,
stopped: Mutex<bool>,
info: Mutex<ChannelInfo>,
//session: SessionWeakPtr,
session: SessionWeakPtr,
}

impl Channel {
Expand All @@ -78,7 +78,7 @@ impl Channel {
pub async fn new(
stream: Box<dyn TransportStream>,
address: Url,
//session: SessionWeakPtr,
session: SessionWeakPtr,
) -> Arc<Self> {
let (reader, writer) = stream.split();
let reader = Mutex::new(reader);
Expand All @@ -96,7 +96,7 @@ impl Channel {
receive_task: StoppableTask::new(),
stopped: Mutex::new(false),
info: Mutex::new(ChannelInfo::new()),
//session,
session,
})
}

Expand Down
15 changes: 8 additions & 7 deletions src/net/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ use super::{
/// Create outbound socket connections.
pub struct Connector {
settings: SettingsPtr,
//pub session: SessionWeakPtr,
pub session: SessionWeakPtr,
}

impl Connector {
/// Create a new connector with default network settings.
pub fn new(settings: SettingsPtr, // session: SessionWeakPtr
) -> Self {
Self { settings, // session
}
pub fn new(settings: SettingsPtr, session: SessionWeakPtr) -> Self {
Self { settings, session }
}

/// Establish an outbound connection.
Expand Down Expand Up @@ -58,10 +56,13 @@ impl Connector {

let channel = match $upgrade {
// session
None => Channel::new(Box::new(stream?), connect_url.clone()).await,
None => {
Channel::new(Box::new(stream?), connect_url.clone(), self.session.clone())
.await
}
Some(u) if u == "tls" => {
let stream = $transport.upgrade_dialer(stream?)?.await;
Channel::new(Box::new(stream?), connect_url).await
Channel::new(Box::new(stream?), connect_url, self.session.clone()).await
}
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
};
Expand Down
2 changes: 1 addition & 1 deletion src/net/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl P2p {
let parent = Arc::downgrade(&self_);

*self_.session_manual.lock().await = Some(ManualSession::new(parent.clone()));
*self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone()));
*self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone()).await);
*self_.session_outbound.lock().await = Some(OutboundSession::new(parent));

register_default_protocols(self_.clone()).await;
Expand Down
9 changes: 4 additions & 5 deletions src/net/session/inbound_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ pub struct InboundSession {

impl InboundSession {
/// Create a new inbound session.
pub fn new(p2p: Weak<P2p>) -> Arc<Self> {
//let acceptor = Acceptor::new(Mutex::new(None));
let acceptor = Acceptor::new();
pub async fn new(p2p: Weak<P2p>) -> Arc<Self> {
let acceptor = Acceptor::new(Mutex::new(None));

let self_ = Arc::new(Self {
p2p,
Expand All @@ -48,9 +47,9 @@ impl InboundSession {
connect_infos: Mutex::new(FxHashMap::default()),
});

//let parent = Arc::downgrade(&self_);
let parent = Arc::downgrade(&self_);

//*self_.acceptor.session.lock().await = Some(Arc::new(parent));
*self_.acceptor.session.lock().await = Some(Arc::new(parent));

self_
}
Expand Down
5 changes: 1 addition & 4 deletions src/net/session/manual_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ impl ManualSession {
executor: Arc<Executor<'_>>,
) -> Result<()> {
let parent = Arc::downgrade(&self);
let connector = Connector::new(
self.p2p().settings(),
//Arc::new(parent)
);
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));

let settings = self.p2p().settings();

Expand Down
14 changes: 9 additions & 5 deletions src/net/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
pub trait Session: Sync {
/// Registers a new channel with the session. Performs a network handshake
/// and starts the channel.
// if we need to pass Self as an Arc we can do so like this:
// pub trait MyTrait: Send + Sync {
// async fn foo(&self, self_: Arc<dyn MyTrait>) {}
// }
async fn register_channel(
self_: Arc<dyn Session>,
&self,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
Expand All @@ -87,14 +91,14 @@ pub trait Session: Sync {
// We do this so that the protocols can begin receiving and buffering messages
// while the handshake protocol is ongoing.
// They are currently in sleep mode.
let p2p = self_.p2p();
let p2p = self.p2p();
let protocols =
p2p.protocol_registry().attach(self_.type_id(), channel.clone(), p2p.clone()).await;
p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;

// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self_.p2p().settings()).await;
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
let handshake_task =
self_.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());

// Switch on the channel
channel.start(executor.clone());
Expand Down
5 changes: 1 addition & 4 deletions src/net/session/outbound_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ impl OutboundSession {
) -> Result<()> {
let parent = Arc::downgrade(&self);

let connector = Connector::new(
self.p2p().settings(),
//Arc::new(parent)
);
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));

loop {
let addr = self.load_address(slot_number).await?;
Expand Down
4 changes: 1 addition & 3 deletions src/net/session/seed_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ impl SeedSession {
};

let parent = Arc::downgrade(&self);
let connector = Connector::new(
settings.clone(), //Arc::new(parent)
);
let connector = Connector::new(settings.clone(), Arc::new(parent));
match connector.connect(seed.clone()).await {
Ok(channel) => {
// Blacklist goes here
Expand Down

0 comments on commit af2ffd4

Please sign in to comment.