Skip to content

Commit

Permalink
Merge pull request #65 from nervosnetwork/support-handle-customize-poll
Browse files Browse the repository at this point in the history
Support handle customize poll
  • Loading branch information
TheWaWaR authored Mar 11, 2019
2 parents 1357a2e + 0c820e2 commit 78fbdaa
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 82 deletions.
15 changes: 11 additions & 4 deletions src/protocol_handle_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ServiceProtocolStream {
.remove_session_notify_senders(id, self.proto_id);
}
Received { id, data } => {
if let Some(session) = self.sessions.get_mut(&id) {
if let Some(session) = self.sessions.get(&id) {
self.handle
.received(&mut self.service_context, session, data);
}
Expand Down Expand Up @@ -119,6 +119,8 @@ impl Stream for ServiceProtocolStream {
}
}

self.handle.poll(&mut self.service_context);

for task in self.service_context.pending_tasks.split_off(0) {
self.service_context.send(task);
}
Expand Down Expand Up @@ -185,14 +187,17 @@ impl SessionProtocolStream {
.connected(&mut self.service_context, &self.context, &version);
}
Disconnected => {
self.handle.disconnected(&mut self.service_context);
self.handle
.disconnected(&mut self.service_context, &self.context);
self.close();
}
Received { data } => {
self.handle.received(&mut self.service_context, data);
self.handle
.received(&mut self.service_context, &self.context, data);
}
Notify { token } => {
self.handle.notify(&mut self.service_context, token);
self.handle
.notify(&mut self.service_context, &self.context, token);
}
Update { listen_addrs } => {
self.service_context.update_listens(listen_addrs);
Expand Down Expand Up @@ -231,6 +236,8 @@ impl Stream for SessionProtocolStream {
}
}

self.handle.poll(&mut self.service_context, &self.context);

for task in self.service_context.pending_tasks.split_off(0) {
self.service_context.send(task);
}
Expand Down
44 changes: 23 additions & 21 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,7 @@ where
// Service handle processing flow
self.handle.handle_event(
&mut self.service_context,
ServiceEvent::SessionClose {
session_context: &session_context,
},
ServiceEvent::SessionClose { session_context },
);
}
}
Expand Down Expand Up @@ -804,15 +802,17 @@ where
);

if self.config.event.contains(&proto_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::Received {
session_id,
proto_id,
data: data.clone(),
},
);
if let Some(session_context) = self.sessions.get(&session_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::Received {
session_context,
proto_id,
data: data.clone(),
},
);
}
}

// callback output
Expand Down Expand Up @@ -1111,15 +1111,17 @@ where
.unwrap_or_else(|| false)
{
if self.config.event.contains(&proto_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::ProtocolSessionNotify {
proto_id,
session_id,
token,
},
)
if let Some(session_context) = self.sessions.get(&session_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::ProtocolSessionNotify {
proto_id,
session_context,
token,
},
)
}
} else if self
.session_proto_handles
.contains_key(&(session_id, proto_id))
Expand Down
10 changes: 5 additions & 5 deletions src/service/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub enum ServiceEvent<'a> {
/// A session close
SessionClose {
/// Session context
session_context: &'a SessionContext,
session_context: SessionContext,
},
/// A session open
SessionOpen {
Expand All @@ -77,8 +77,8 @@ pub enum ProtocolEvent<'a> {
},
/// Received protocol data
Received {
/// Session id
session_id: SessionId,
/// Session context
session_context: &'a SessionContext,
/// Protocol id
proto_id: ProtocolId,
/// Protocol version
Expand All @@ -100,8 +100,8 @@ pub enum ProtocolEvent<'a> {
},
/// Session-level notify task
ProtocolSessionNotify {
/// Session id
session_id: SessionId,
/// Session context
session_context: &'a SessionContext,
/// Protocol id
proto_id: ProtocolId,
/// Notify token
Expand Down
69 changes: 17 additions & 52 deletions src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use tokio::codec::{Decoder, Encoder};

use crate::{
context::{ServiceContext, SessionContext},
service::{ProtocolEvent, ProtocolHandle, ServiceError, ServiceEvent},
ProtocolId,
service::{ProtocolEvent, ServiceError, ServiceEvent},
};

/// Service handle
Expand Down Expand Up @@ -83,6 +82,9 @@ pub trait ServiceProtocol {
}
/// Called when the Service receives the notify task
fn notify(&mut self, _service: &mut ServiceContext, _token: u64) {}
/// Behave like `Stream::poll`, but nothing output
#[inline]
fn poll(&mut self, _service: &mut ServiceContext) {}
}

/// Session level protocol handle
Expand All @@ -96,14 +98,23 @@ pub trait SessionProtocol {
) {
}
/// Called when closing protocol
fn disconnected(&mut self, _service: &mut ServiceContext) {}
fn disconnected(&mut self, _service: &mut ServiceContext, _session: &SessionContext) {}
/// Called when the corresponding protocol message is received
fn received(&mut self, _service: &mut ServiceContext, _data: bytes::Bytes) {}
fn received(
&mut self,
_service: &mut ServiceContext,
_session: &SessionContext,
_data: bytes::Bytes,
) {
}
/// Called when the session receives the notify task
fn notify(&mut self, _service: &mut ServiceContext, _token: u64) {}
fn notify(&mut self, _service: &mut ServiceContext, _session: &SessionContext, _token: u64) {}
/// Behave like `Stream::poll`, but nothing output, shutdown when session close
#[inline]
fn poll(&mut self, _service: &mut ServiceContext, _session: &SessionContext) {}
}

/// A trait can define codec
/// A trait can define codec, just wrapper `Decoder` and `Encoder`
pub trait Codec:
Decoder<Item = bytes::BytesMut, Error = io::Error> + Encoder<Item = bytes::Bytes, Error = io::Error>
{
Expand Down Expand Up @@ -133,52 +144,6 @@ impl Encoder for Box<dyn Codec + Send + 'static> {
}
}

/// Define the minimum data required for a custom protocol
pub trait ProtocolMeta {
/// Protocol id
fn id(&self) -> ProtocolId;

/// Protocol name, default is "/p2p/protocol_id"
#[inline]
fn name(&self) -> String {
format!("/p2p/{}", self.id())
}

/// Protocol supported version
fn support_versions(&self) -> Vec<String> {
vec!["0.0.1".to_owned()]
}

/// The codec used by the custom protocol, such as `LengthDelimitedCodec` by tokio
fn codec(&self) -> Box<dyn Codec + Send + 'static>;

/// A service level callback handle for a protocol.
///
/// ---
///
/// #### Behavior
///
/// This function is called when the protocol is first opened in the service
/// and remains in memory until the entire service is closed.
fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
ProtocolHandle::Neither
}

/// A session level callback handle for a protocol.
///
/// ---
///
/// #### Behavior
///
/// When a session is opened, whenever the protocol of the session is opened,
/// the function will be called again to generate the corresponding exclusive handle.
///
/// Correspondingly, whenever the protocol is closed, the corresponding exclusive handle is cleared.
fn session_handle(&self) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> {
ProtocolHandle::Neither
}
}

impl ServiceHandle for Box<dyn ServiceHandle + Send + 'static> {
fn handle_error(&mut self, control: &mut ServiceContext, error: ServiceError) {
(&mut **self).handle_error(control, error)
Expand Down

0 comments on commit 78fbdaa

Please sign in to comment.