Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add before handle #158

Merged
merged 3 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashMap, io, sync::Arc, time::Duration};

use tokio::codec::LengthDelimitedCodec;

use crate::{
Expand Down Expand Up @@ -130,6 +129,9 @@ pub(crate) type CodecFn = Box<Fn() -> Box<dyn Codec + Send + 'static> + Send + S
pub(crate) type SessionHandleFn =
Box<FnMut() -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> + Send>;
pub(crate) type SelectVersionFn = Box<dyn Fn() -> Option<SelectFn<String>> + Send + Sync + 'static>;
pub(crate) type BeforeReceiveFn = Box<dyn Fn() -> Option<BeforeReceive> + Send + Sync + 'static>;
pub(crate) type BeforeReceive =
Box<dyn Fn(bytes::BytesMut) -> Result<bytes::Bytes, io::Error> + Send + 'static>;

/// Builder for protocol meta
pub struct MetaBuilder {
Expand All @@ -140,6 +142,8 @@ pub struct MetaBuilder {
service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
session_handle: SessionHandleFn,
select_version: SelectVersionFn,
before_send: Option<Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,
Copy link
Member

@doitian doitian May 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the choice here that before_send is an Option, but before_receive is a Box<Fn>?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the message has a unified entry, only one fn can be used.

The received is that each protocol in each session has an entry and there is no aggregation, so here need to have a handler that can always generate the exact same function, fn -> Option<fn>.

Each time a protocol open, get a receive fn to it's stream

before_receive: BeforeReceiveFn,
}

impl MetaBuilder {
Expand Down Expand Up @@ -220,6 +224,24 @@ impl MetaBuilder {
self
}

/// Unified processing of messages before they are sent
pub fn before_send<T>(mut self, f: T) -> Self
where
T: Fn(bytes::Bytes) -> bytes::Bytes + 'static + Send,
{
self.before_send = Some(Box::new(f));
self
}

/// Unified processing of messages before user received
pub fn before_receive<T>(mut self, f: T) -> Self
where
T: Fn() -> Option<BeforeReceive> + Send + Sync + 'static,
{
self.before_receive = Box::new(f);
self
}

/// Combine the configuration of this builder to create a ProtocolMeta
pub fn build(self) -> ProtocolMeta {
let meta = Meta {
Expand All @@ -228,11 +250,13 @@ impl MetaBuilder {
support_versions: self.support_versions,
codec: self.codec,
select_version: self.select_version,
before_receive: self.before_receive,
};
ProtocolMeta {
inner: Arc::new(meta),
service_handle: self.service_handle,
session_handle: self.session_handle,
before_send: self.before_send,
}
}
}
Expand All @@ -247,6 +271,8 @@ impl Default for MetaBuilder {
service_handle: ProtocolHandle::Neither,
session_handle: Box::new(|| ProtocolHandle::Neither),
select_version: Box::new(|| None),
before_send: None,
before_receive: Box::new(|| None),
}
}
}
32 changes: 23 additions & 9 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct Service<T> {

next_session: SessionId,

before_sends: HashMap<ProtocolId, Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,

/// Can be upgrade to list service level protocols
handle: T,
/// The buffer will be prioritized for distribution to session
Expand Down Expand Up @@ -148,6 +150,7 @@ where

Service {
protocol_configs,
before_sends: HashMap::default(),
handle,
multi_transport: MultiTransport::new(config.timeout),
future_task_sender,
Expand Down Expand Up @@ -1124,13 +1127,16 @@ where
fn start_service_proto_handles(&mut self) {
let ids = self
.protocol_configs
.values()
.map(ProtocolMeta::id)
.collect::<Vec<ProtocolId>>();
for id in ids {
.values_mut()
.map(|meta| (meta.id(), meta.before_send.take()))
.collect::<Vec<(ProtocolId, _)>>();
for (id, before_send) in ids {
if let Some(handle) = self.proto_handle(false, id) {
self.handle_open(handle, id, None);
}
if let Some(function) = before_send {
self.before_sends.insert(id, function);
}
}
}

Expand Down Expand Up @@ -1290,11 +1296,19 @@ where
proto_id,
priority,
data,
} => match target {
TargetSession::Single(id) => self.send_message_to(id, proto_id, priority, data),
TargetSession::Multi(ids) => self.filter_broadcast(ids, proto_id, priority, data),
TargetSession::All => self.broadcast(proto_id, priority, data),
},
} => {
let data = match self.before_sends.get(&proto_id) {
Some(function) => function(data),
None => data,
};
match target {
TargetSession::Single(id) => self.send_message_to(id, proto_id, priority, data),
TargetSession::Multi(ids) => {
self.filter_broadcast(ids, proto_id, priority, data)
}
TargetSession::All => self.broadcast(proto_id, priority, data),
}
}
ServiceTask::Dial { address, target } => {
if !self.dial_protocols.contains_key(&address) {
if let Err(e) = self.dial_inner(address.clone(), target) {
Expand Down
4 changes: 3 additions & 1 deletion src/service/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
builder::{CodecFn, NameFn, SelectVersionFn, SessionHandleFn},
builder::{BeforeReceiveFn, CodecFn, NameFn, SelectVersionFn, SessionHandleFn},
traits::{Codec, ServiceProtocol, SessionProtocol},
yamux::config::Config as YamuxConfig,
ProtocolId, SessionId,
Expand Down Expand Up @@ -56,6 +56,7 @@ pub struct ProtocolMeta {
pub(crate) inner: Arc<Meta>,
pub(crate) service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
pub(crate) session_handle: SessionHandleFn,
pub(crate) before_send: Option<Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,
}

impl ProtocolMeta {
Expand Down Expand Up @@ -122,6 +123,7 @@ pub(crate) struct Meta {
pub(crate) support_versions: Vec<String>,
pub(crate) codec: CodecFn,
pub(crate) select_version: SelectVersionFn,
pub(crate) before_receive: BeforeReceiveFn,
}

/// Protocol handle
Expand Down
2 changes: 2 additions & 0 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ where
};

let proto_id = proto.id;
let before_receive_fn = (proto.before_receive)();
let raw_part = sub_stream.into_parts();
let mut part = FramedParts::new(raw_part.io, (proto.codec)());
// Replace buffered data
Expand All @@ -465,6 +466,7 @@ where
.session_proto_sender(self.session_proto_senders.remove(&proto_id))
.keep_buffer(self.keep_buffer)
.event(self.event.contains(&proto_id))
.before_receive(before_receive_fn)
.build(frame);

self.sub_streams
Expand Down
29 changes: 24 additions & 5 deletions src/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
};

use crate::{
builder::BeforeReceive,
context::SessionContext,
error::Error,
protocol_handle_stream::{ServiceProtocolEvent, SessionProtocolEvent},
Expand Down Expand Up @@ -99,6 +100,7 @@ pub(crate) struct SubStream<U> {

service_proto_sender: Option<mpsc::Sender<ServiceProtocolEvent>>,
session_proto_sender: Option<mpsc::Sender<SessionProtocolEvent>>,
before_receive: Option<BeforeReceive>,

/// Delay notify with abnormally poor machines
delay: Arc<AtomicBool>,
Expand Down Expand Up @@ -404,19 +406,28 @@ where
data.len()
);

let data = match self.before_receive {
Some(ref function) => match function(data) {
Ok(data) => data,
Err(err) => {
self.error_close(err);
return;
}
},
None => data.freeze(),
};

if self.service_proto_sender.is_some() {
self.service_proto_buf
.push_back(ServiceProtocolEvent::Received {
id: self.context.id,
data: data.clone().freeze(),
data: data.clone(),
})
}

if self.session_proto_sender.is_some() {
self.session_proto_buf
.push_back(SessionProtocolEvent::Received {
data: data.clone().freeze(),
})
.push_back(SessionProtocolEvent::Received { data: data.clone() })
}

self.distribute_to_user_level();
Expand All @@ -425,7 +436,7 @@ where
self.output_event(ProtocolEvent::Message {
id: self.id,
proto_id: self.proto_id,
data: data.freeze(),
data,
priority: Priority::Normal,
})
}
Expand Down Expand Up @@ -573,6 +584,7 @@ pub(crate) struct SubstreamBuilder {

service_proto_sender: Option<mpsc::Sender<ServiceProtocolEvent>>,
session_proto_sender: Option<mpsc::Sender<SessionProtocolEvent>>,
before_receive: Option<BeforeReceive>,

/// Send event to session
event_sender: mpsc::Sender<ProtocolEvent>,
Expand All @@ -591,6 +603,7 @@ impl SubstreamBuilder {
SubstreamBuilder {
service_proto_sender: None,
session_proto_sender: None,
before_receive: None,
event_receiver,
event_sender,
closed,
Expand Down Expand Up @@ -644,6 +657,11 @@ impl SubstreamBuilder {
self
}

pub fn before_receive(mut self, f: Option<BeforeReceive>) -> Self {
self.before_receive = f;
self
}

pub fn build<U>(self, sub_stream: Framed<StreamHandle, U>) -> SubStream<U>
where
U: Codec,
Expand All @@ -670,6 +688,7 @@ impl SubstreamBuilder {

service_proto_sender: self.service_proto_sender,
session_proto_sender: self.session_proto_sender,
before_receive: self.before_receive,

delay: Arc::new(AtomicBool::new(false)),

Expand Down