Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report open proto failed #54

Merged
merged 2 commits into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ description = "Minimal implementation for a multiplexed p2p network framework."
authors = ["piaoliu <441594700@qq.com>", "Nervos Core Dev <dev@nervos.org>"]
repository = "https://github.com/nervosnetwork/p2p"
include = ["Cargo.toml", "src/*", "README.md", "LICENSE"]
readme = "README.md"
keywords = ["network", "peer-to-peer"]
categories = ["network-programming", "asynchronous"]
edition = "2018"
Expand Down
2 changes: 1 addition & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
};

/// Session context, contains basic information about the current connection
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct SessionContext {
pub(crate) event_sender: mpsc::Sender<SessionEvent>,
/// Session's ID
Expand Down
25 changes: 22 additions & 3 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
traits::{ProtocolMeta, ServiceHandle, ServiceProtocol, SessionProtocol},
utils::{dns::DNSResolver, extract_peer_id, multiaddr_to_socketaddr},
yamux::{session::SessionType, Config as YamuxConfig},
ProtocolId, SessionId, StreamId,
ProtocolId, SessionId,
};

mod control;
Expand Down Expand Up @@ -66,10 +66,18 @@ pub enum ServiceError {
/// error
error: Error<ServiceTask>,
},
/// Protocol select fail
ProtocolSelectError {
/// Protocol name, if none, timeout or other net problem,
/// if Some, don't support this proto
proto_name: Option<String>,
/// Session context
session_context: SessionContext,
},
/// Protocol error during interaction
ProtocolError {
/// Stream id
id: StreamId,
/// Session id
id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Codec error
Expand Down Expand Up @@ -1022,6 +1030,17 @@ where
..
} => self.protocol_open(id, proto_id, version),
SessionEvent::ProtocolClose { id, proto_id, .. } => self.protocol_close(id, proto_id),
SessionEvent::ProtocolSelectError { id, proto_name } => {
if let Some(session_context) = self.sessions.get(&id).cloned() {
self.handle.handle_error(
&mut self.service_context,
ServiceError::ProtocolSelectError {
proto_name,
session_context,
},
)
}
}
SessionEvent::ProtocolError {
id,
proto_id,
Expand Down
145 changes: 91 additions & 54 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ use futures::{
use log::{debug, error, trace, warn};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::{error, io, time::Duration};
use tokio::codec::{Decoder, Encoder, Framed, FramedParts};
use std::{
error, io,
time::{Duration, Instant},
};
use tokio::prelude::{AsyncRead, AsyncWrite, FutureExt};
use tokio::{
codec::{Decoder, Encoder, Framed, FramedParts, LengthDelimitedCodec},
timer::Delay,
};

use crate::{
error::Error,
Expand Down Expand Up @@ -95,10 +101,16 @@ pub(crate) enum SessionEvent {
/// Stream id
stream_id: StreamId,
},
ProtocolSelectError {
/// Session id
id: SessionId,
/// proto_name
proto_name: Option<String>,
},
/// Codec error
ProtocolError {
/// Stream id
id: StreamId,
/// Session id
id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Codec error
Expand All @@ -114,6 +126,7 @@ pub(crate) struct Session<T, U> {

id: SessionId,
timeout: Duration,
timeout_check: Option<Delay>,

dead: bool,

Expand Down Expand Up @@ -166,6 +179,7 @@ where
protocol_configs: meta.protocol_configs,
id: meta.id,
timeout: meta.timeout,
timeout_check: Some(Delay::new(Instant::now() + meta.timeout)),
ty: meta.ty,
next_stream: 0,
sub_streams: HashMap::default(),
Expand All @@ -181,21 +195,24 @@ where
}
}

/// After the session is established, the client is requested to open some custom protocol sub stream.
pub fn open_proto_stream(&mut self, proto_name: &str) {
debug!("try open proto, {}", proto_name);
/// select procedure
#[inline(always)]
fn select_procedure(
&mut self,
procedure: impl Future<
Item = (
Framed<StreamHandle, LengthDelimitedCodec>,
String,
Option<String>,
),
Error = io::Error,
> + Send
+ 'static,
) {
let event_sender = self.proto_event_sender.clone();
let handle = self.socket.open_stream().unwrap();
let versions = self
.protocol_configs
.get(proto_name)
.unwrap()
.support_versions();
let proto_info = ProtocolInfo::new(&proto_name, versions);

let task = client_select(handle, proto_info)
.and_then(|(handle, name, version)| {
match version {
let task = procedure.timeout(self.timeout).then(|result| {
match result {
Ok((handle, name, version)) => match version {
Some(version) => {
let send_task = event_sender.send(ProtocolEvent::Open {
sub_stream: Box::new(handle),
Expand All @@ -206,18 +223,47 @@ where
error!("stream send back error: {:?}", err);
}));
}
None => debug!("Negotiation to open the protocol {} failed", name),
None => {
debug!("Negotiation to open the protocol {} failed", name);
let send_task = event_sender.send(ProtocolEvent::SelectError {
proto_name: Some(name),
});
tokio::spawn(send_task.map(|_| ()).map_err(|err| {
error!("select error send back error: {:?}", err);
}));
}
},
Err(err) => {
debug!("stream protocol select err: {:?}", err);
let send_task =
event_sender.send(ProtocolEvent::SelectError { proto_name: None });
tokio::spawn(send_task.map(|_| ()).map_err(|err| {
error!("select error send back error: {:?}", err);
}));
}
Ok(())
})
.timeout(self.timeout)
.map_err(|err| {
trace!("stream protocol select err: {:?}", err);
});
}

Ok(())
});

tokio::spawn(task);
}

/// After the session is established, the client is requested to open some custom protocol sub stream.
pub fn open_proto_stream(&mut self, proto_name: &str) {
debug!("try open proto, {}", proto_name);
let handle = self.socket.open_stream().unwrap();
let versions = self
.protocol_configs
.get(proto_name)
.unwrap()
.support_versions();
let proto_info = ProtocolInfo::new(&proto_name, versions);

let task = client_select(handle, proto_info);
self.select_procedure(task);
}

/// Push the generated event to the Service
#[inline]
fn event_output(&mut self, event: SessionEvent) {
Expand Down Expand Up @@ -277,7 +323,6 @@ where

/// Handling client-initiated open protocol sub stream requests
fn handle_sub_stream(&mut self, sub_stream: StreamHandle) {
let event_sender = self.proto_event_sender.clone();
let proto_metas = self
.protocol_configs
.values()
Expand All @@ -288,34 +333,8 @@ where
})
.collect();

let task = server_select(sub_stream, proto_metas)
.and_then(|(handle, name, version)| {
match version {
Some(version) => {
let send_task = event_sender.send(ProtocolEvent::Open {
sub_stream: Box::new(handle),
proto_name: name,
version,
});

tokio::spawn(send_task.map(|_| ()).map_err(|err| {
error!("stream send back error: {:?}", err);
}));
}
None => {
// server close the connect
let _ = handle.into_inner().shutdown();
debug!("negotiation to open the protocol [{}] failed", name);
}
}
Ok(())
})
.timeout(self.timeout)
.map_err(|err| {
trace!("stream protocol select err: {:?}", err);
});

tokio::spawn(task);
let task = server_select(sub_stream, proto_metas);
self.select_procedure(task);
}

/// Handling events uploaded by the protocol stream
Expand Down Expand Up @@ -384,6 +403,12 @@ where
data,
})
}
ProtocolEvent::SelectError { proto_name } => {
self.event_output(SessionEvent::ProtocolSelectError {
id: self.id,
proto_name,
})
}
ProtocolEvent::Error {
proto_id, error, ..
} => {
Expand Down Expand Up @@ -477,6 +502,18 @@ where
self.flush();
}

if let Some(mut check) = self.timeout_check.take() {
match check.poll() {
Ok(Async::Ready(_)) => {
if self.sub_streams.is_empty() {
self.dead = true;
}
}
Ok(Async::NotReady) => self.timeout_check = Some(check),
Err(e) => debug!("timeout check error: {}", e),
}
}

loop {
match self.socket.poll() {
Ok(Async::Ready(Some(sub_stream))) => self.handle_sub_stream(sub_stream),
Expand Down
3 changes: 3 additions & 0 deletions src/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(crate) enum ProtocolEvent {
/// Data
data: bytes::Bytes,
},
SelectError {
proto_name: Option<String>,
},
/// Codec error
Error {
/// Stream id
Expand Down