Skip to content

Commit

Permalink
Merge pull request #54 from nervosnetwork/report-open-proto-failed
Browse files Browse the repository at this point in the history
Report open proto failed
  • Loading branch information
TheWaWaR authored Feb 28, 2019
2 parents 4e1a26d + 23930a8 commit 71de662
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 58 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ description = "Minimal implementation for a multiplexed p2p network framework."
authors = ["piaoliu <441594700@qq.com>", "Nervos Core Dev <dev@nervos.org>"]
repository = "https://github.com/nervosnetwork/p2p"
include = ["Cargo.toml", "src/*", "README.md", "LICENSE"]
readme = "README.md"
keywords = ["network", "peer-to-peer"]
categories = ["network-programming", "asynchronous"]
edition = "2018"
Expand Down
2 changes: 1 addition & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
};

/// Session context, contains basic information about the current connection
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct SessionContext {
pub(crate) event_sender: mpsc::Sender<SessionEvent>,
/// Session's ID
Expand Down
25 changes: 22 additions & 3 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
traits::{ProtocolMeta, ServiceHandle, ServiceProtocol, SessionProtocol},
utils::{dns::DNSResolver, extract_peer_id, multiaddr_to_socketaddr},
yamux::{session::SessionType, Config as YamuxConfig},
ProtocolId, SessionId, StreamId,
ProtocolId, SessionId,
};

mod control;
Expand Down Expand Up @@ -66,10 +66,18 @@ pub enum ServiceError {
/// error
error: Error<ServiceTask>,
},
/// Protocol select fail
ProtocolSelectError {
/// Protocol name, if none, timeout or other net problem,
/// if Some, don't support this proto
proto_name: Option<String>,
/// Session context
session_context: SessionContext,
},
/// Protocol error during interaction
ProtocolError {
/// Stream id
id: StreamId,
/// Session id
id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Codec error
Expand Down Expand Up @@ -1022,6 +1030,17 @@ where
..
} => self.protocol_open(id, proto_id, version),
SessionEvent::ProtocolClose { id, proto_id, .. } => self.protocol_close(id, proto_id),
SessionEvent::ProtocolSelectError { id, proto_name } => {
if let Some(session_context) = self.sessions.get(&id).cloned() {
self.handle.handle_error(
&mut self.service_context,
ServiceError::ProtocolSelectError {
proto_name,
session_context,
},
)
}
}
SessionEvent::ProtocolError {
id,
proto_id,
Expand Down
145 changes: 91 additions & 54 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ use futures::{
use log::{debug, error, trace, warn};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::{error, io, time::Duration};
use tokio::codec::{Decoder, Encoder, Framed, FramedParts};
use std::{
error, io,
time::{Duration, Instant},
};
use tokio::prelude::{AsyncRead, AsyncWrite, FutureExt};
use tokio::{
codec::{Decoder, Encoder, Framed, FramedParts, LengthDelimitedCodec},
timer::Delay,
};

use crate::{
error::Error,
Expand Down Expand Up @@ -95,10 +101,16 @@ pub(crate) enum SessionEvent {
/// Stream id
stream_id: StreamId,
},
ProtocolSelectError {
/// Session id
id: SessionId,
/// proto_name
proto_name: Option<String>,
},
/// Codec error
ProtocolError {
/// Stream id
id: StreamId,
/// Session id
id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Codec error
Expand All @@ -114,6 +126,7 @@ pub(crate) struct Session<T, U> {

id: SessionId,
timeout: Duration,
timeout_check: Option<Delay>,

dead: bool,

Expand Down Expand Up @@ -166,6 +179,7 @@ where
protocol_configs: meta.protocol_configs,
id: meta.id,
timeout: meta.timeout,
timeout_check: Some(Delay::new(Instant::now() + meta.timeout)),
ty: meta.ty,
next_stream: 0,
sub_streams: HashMap::default(),
Expand All @@ -181,21 +195,24 @@ where
}
}

/// After the session is established, the client is requested to open some custom protocol sub stream.
pub fn open_proto_stream(&mut self, proto_name: &str) {
debug!("try open proto, {}", proto_name);
/// select procedure
#[inline(always)]
fn select_procedure(
&mut self,
procedure: impl Future<
Item = (
Framed<StreamHandle, LengthDelimitedCodec>,
String,
Option<String>,
),
Error = io::Error,
> + Send
+ 'static,
) {
let event_sender = self.proto_event_sender.clone();
let handle = self.socket.open_stream().unwrap();
let versions = self
.protocol_configs
.get(proto_name)
.unwrap()
.support_versions();
let proto_info = ProtocolInfo::new(&proto_name, versions);

let task = client_select(handle, proto_info)
.and_then(|(handle, name, version)| {
match version {
let task = procedure.timeout(self.timeout).then(|result| {
match result {
Ok((handle, name, version)) => match version {
Some(version) => {
let send_task = event_sender.send(ProtocolEvent::Open {
sub_stream: Box::new(handle),
Expand All @@ -206,18 +223,47 @@ where
error!("stream send back error: {:?}", err);
}));
}
None => debug!("Negotiation to open the protocol {} failed", name),
None => {
debug!("Negotiation to open the protocol {} failed", name);
let send_task = event_sender.send(ProtocolEvent::SelectError {
proto_name: Some(name),
});
tokio::spawn(send_task.map(|_| ()).map_err(|err| {
error!("select error send back error: {:?}", err);
}));
}
},
Err(err) => {
debug!("stream protocol select err: {:?}", err);
let send_task =
event_sender.send(ProtocolEvent::SelectError { proto_name: None });
tokio::spawn(send_task.map(|_| ()).map_err(|err| {
error!("select error send back error: {:?}", err);
}));
}
Ok(())
})
.timeout(self.timeout)
.map_err(|err| {
trace!("stream protocol select err: {:?}", err);
});
}

Ok(())
});

tokio::spawn(task);
}

/// After the session is established, the client is requested to open some custom protocol sub stream.
pub fn open_proto_stream(&mut self, proto_name: &str) {
debug!("try open proto, {}", proto_name);
let handle = self.socket.open_stream().unwrap();
let versions = self
.protocol_configs
.get(proto_name)
.unwrap()
.support_versions();
let proto_info = ProtocolInfo::new(&proto_name, versions);

let task = client_select(handle, proto_info);
self.select_procedure(task);
}

/// Push the generated event to the Service
#[inline]
fn event_output(&mut self, event: SessionEvent) {
Expand Down Expand Up @@ -277,7 +323,6 @@ where

/// Handling client-initiated open protocol sub stream requests
fn handle_sub_stream(&mut self, sub_stream: StreamHandle) {
let event_sender = self.proto_event_sender.clone();
let proto_metas = self
.protocol_configs
.values()
Expand All @@ -288,34 +333,8 @@ where
})
.collect();

let task = server_select(sub_stream, proto_metas)
.and_then(|(handle, name, version)| {
match version {
Some(version) => {
let send_task = event_sender.send(ProtocolEvent::Open {
sub_stream: Box::new(handle),
proto_name: name,
version,
});

tokio::spawn(send_task.map(|_| ()).map_err(|err| {
error!("stream send back error: {:?}", err);
}));
}
None => {
// server close the connect
let _ = handle.into_inner().shutdown();
debug!("negotiation to open the protocol [{}] failed", name);
}
}
Ok(())
})
.timeout(self.timeout)
.map_err(|err| {
trace!("stream protocol select err: {:?}", err);
});

tokio::spawn(task);
let task = server_select(sub_stream, proto_metas);
self.select_procedure(task);
}

/// Handling events uploaded by the protocol stream
Expand Down Expand Up @@ -384,6 +403,12 @@ where
data,
})
}
ProtocolEvent::SelectError { proto_name } => {
self.event_output(SessionEvent::ProtocolSelectError {
id: self.id,
proto_name,
})
}
ProtocolEvent::Error {
proto_id, error, ..
} => {
Expand Down Expand Up @@ -477,6 +502,18 @@ where
self.flush();
}

if let Some(mut check) = self.timeout_check.take() {
match check.poll() {
Ok(Async::Ready(_)) => {
if self.sub_streams.is_empty() {
self.dead = true;
}
}
Ok(Async::NotReady) => self.timeout_check = Some(check),
Err(e) => debug!("timeout check error: {}", e),
}
}

loop {
match self.socket.poll() {
Ok(Async::Ready(Some(sub_stream))) => self.handle_sub_stream(sub_stream),
Expand Down
3 changes: 3 additions & 0 deletions src/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(crate) enum ProtocolEvent {
/// Data
data: bytes::Bytes,
},
SelectError {
proto_name: Option<String>,
},
/// Codec error
Error {
/// Stream id
Expand Down

0 comments on commit 71de662

Please sign in to comment.