Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support handle customize poll #65

Merged
merged 1 commit into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/protocol_handle_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ServiceProtocolStream {
.remove_session_notify_senders(id, self.proto_id);
}
Received { id, data } => {
if let Some(session) = self.sessions.get_mut(&id) {
if let Some(session) = self.sessions.get(&id) {
self.handle
.received(&mut self.service_context, session, data);
}
Expand Down Expand Up @@ -119,6 +119,8 @@ impl Stream for ServiceProtocolStream {
}
}

self.handle.poll(&mut self.service_context);

for task in self.service_context.pending_tasks.split_off(0) {
self.service_context.send(task);
}
Expand Down Expand Up @@ -185,14 +187,17 @@ impl SessionProtocolStream {
.connected(&mut self.service_context, &self.context, &version);
}
Disconnected => {
self.handle.disconnected(&mut self.service_context);
self.handle
.disconnected(&mut self.service_context, &self.context);
self.close();
}
Received { data } => {
self.handle.received(&mut self.service_context, data);
self.handle
.received(&mut self.service_context, &self.context, data);
}
Notify { token } => {
self.handle.notify(&mut self.service_context, token);
self.handle
.notify(&mut self.service_context, &self.context, token);
}
Update { listen_addrs } => {
self.service_context.update_listens(listen_addrs);
Expand Down Expand Up @@ -231,6 +236,8 @@ impl Stream for SessionProtocolStream {
}
}

self.handle.poll(&mut self.service_context, &self.context);

for task in self.service_context.pending_tasks.split_off(0) {
self.service_context.send(task);
}
Expand Down
44 changes: 23 additions & 21 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,7 @@ where
// Service handle processing flow
self.handle.handle_event(
&mut self.service_context,
ServiceEvent::SessionClose {
session_context: &session_context,
},
ServiceEvent::SessionClose { session_context },
);
}
}
Expand Down Expand Up @@ -804,15 +802,17 @@ where
);

if self.config.event.contains(&proto_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::Received {
session_id,
proto_id,
data: data.clone(),
},
);
if let Some(session_context) = self.sessions.get(&session_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::Received {
session_context,
proto_id,
data: data.clone(),
},
);
}
}

// callback output
Expand Down Expand Up @@ -1111,15 +1111,17 @@ where
.unwrap_or_else(|| false)
{
if self.config.event.contains(&proto_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::ProtocolSessionNotify {
proto_id,
session_id,
token,
},
)
if let Some(session_context) = self.sessions.get(&session_id) {
// event output
self.handle.handle_proto(
&mut self.service_context,
ProtocolEvent::ProtocolSessionNotify {
proto_id,
session_context,
token,
},
)
}
} else if self
.session_proto_handles
.contains_key(&(session_id, proto_id))
Expand Down
10 changes: 5 additions & 5 deletions src/service/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub enum ServiceEvent<'a> {
/// A session close
SessionClose {
/// Session context
session_context: &'a SessionContext,
session_context: SessionContext,
},
/// A session open
SessionOpen {
Expand All @@ -77,8 +77,8 @@ pub enum ProtocolEvent<'a> {
},
/// Received protocol data
Received {
/// Session id
session_id: SessionId,
/// Session context
session_context: &'a SessionContext,
/// Protocol id
proto_id: ProtocolId,
/// Protocol version
Expand All @@ -100,8 +100,8 @@ pub enum ProtocolEvent<'a> {
},
/// Session-level notify task
ProtocolSessionNotify {
/// Session id
session_id: SessionId,
/// Session context
session_context: &'a SessionContext,
/// Protocol id
proto_id: ProtocolId,
/// Notify token
Expand Down
69 changes: 17 additions & 52 deletions src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use tokio::codec::{Decoder, Encoder};

use crate::{
context::{ServiceContext, SessionContext},
service::{ProtocolEvent, ProtocolHandle, ServiceError, ServiceEvent},
ProtocolId,
service::{ProtocolEvent, ServiceError, ServiceEvent},
};

/// Service handle
Expand Down Expand Up @@ -83,6 +82,9 @@ pub trait ServiceProtocol {
}
/// Called when the Service receives the notify task
fn notify(&mut self, _service: &mut ServiceContext, _token: u64) {}
/// Behave like `Stream::poll`, but nothing output
#[inline]
fn poll(&mut self, _service: &mut ServiceContext) {}
}

/// Session level protocol handle
Expand All @@ -96,14 +98,23 @@ pub trait SessionProtocol {
) {
}
/// Called when closing protocol
fn disconnected(&mut self, _service: &mut ServiceContext) {}
fn disconnected(&mut self, _service: &mut ServiceContext, _session: &SessionContext) {}
/// Called when the corresponding protocol message is received
fn received(&mut self, _service: &mut ServiceContext, _data: bytes::Bytes) {}
fn received(
&mut self,
_service: &mut ServiceContext,
_session: &SessionContext,
_data: bytes::Bytes,
) {
}
/// Called when the session receives the notify task
fn notify(&mut self, _service: &mut ServiceContext, _token: u64) {}
fn notify(&mut self, _service: &mut ServiceContext, _session: &SessionContext, _token: u64) {}
/// Behave like `Stream::poll`, but nothing output, shutdown when session close
#[inline]
fn poll(&mut self, _service: &mut ServiceContext, _session: &SessionContext) {}
}

/// A trait can define codec
/// A trait can define codec, just wrapper `Decoder` and `Encoder`
pub trait Codec:
Decoder<Item = bytes::BytesMut, Error = io::Error> + Encoder<Item = bytes::Bytes, Error = io::Error>
{
Expand Down Expand Up @@ -133,52 +144,6 @@ impl Encoder for Box<dyn Codec + Send + 'static> {
}
}

/// Define the minimum data required for a custom protocol
pub trait ProtocolMeta {
/// Protocol id
fn id(&self) -> ProtocolId;

/// Protocol name, default is "/p2p/protocol_id"
#[inline]
fn name(&self) -> String {
format!("/p2p/{}", self.id())
}

/// Protocol supported version
fn support_versions(&self) -> Vec<String> {
vec!["0.0.1".to_owned()]
}

/// The codec used by the custom protocol, such as `LengthDelimitedCodec` by tokio
fn codec(&self) -> Box<dyn Codec + Send + 'static>;

/// A service level callback handle for a protocol.
///
/// ---
///
/// #### Behavior
///
/// This function is called when the protocol is first opened in the service
/// and remains in memory until the entire service is closed.
fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
ProtocolHandle::Neither
}

/// A session level callback handle for a protocol.
///
/// ---
///
/// #### Behavior
///
/// When a session is opened, whenever the protocol of the session is opened,
/// the function will be called again to generate the corresponding exclusive handle.
///
/// Correspondingly, whenever the protocol is closed, the corresponding exclusive handle is cleared.
fn session_handle(&self) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> {
ProtocolHandle::Neither
}
}

impl ServiceHandle for Box<dyn ServiceHandle + Send + 'static> {
fn handle_error(&mut self, control: &mut ServiceContext, error: ServiceError) {
(&mut **self).handle_error(control, error)
Expand Down