Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add before handle #158

Merged
merged 3 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tentacle"
version = "0.2.0-alpha.18"
version = "0.2.0"
license = "MIT"
description = "Minimal implementation for a multiplexed p2p network framework."
authors = ["piaoliu <441594700@qq.com>", "Nervos Core Dev <dev@nervos.org>"]
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ $ RUST_LOG=simple=info,tentacle=debug cargo run --example simple

4. Now you can see some data interaction information on the terminal.

You can see more detailed example in these two repos: [ckb](https://github.com/nervosnetwork/ckb)/[cita](https://github.com/cryptape/cita).

## Why?

Because when I use `rust-libp2p`, I have encountered some difficult problems,
Expand Down
4 changes: 2 additions & 2 deletions protocols/discovery/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tentacle-discovery"
version = "0.2.4"
version = "0.2.5"
authors = ["Linfeng Qian <thewawar@gmail.com>"]
license = "MIT"
description = "p2p discovery protocol main reference bitcoin"
Expand All @@ -10,7 +10,7 @@ categories = ["network-programming", "asynchronous"]
edition = "2018"

[dependencies]
p2p = { path = "../..", version = "0.2.0-alpha.13", package = "tentacle" }
p2p = { path = "../..", version = "0.2.0", package = "tentacle" }
bytes = "0.4"
byteorder = "1.2"
fnv = "1.0"
Expand Down
4 changes: 2 additions & 2 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tentacle-identify"
version = "0.2.4"
version = "0.2.5"
authors = ["Qian Linfeng <thewawar@gmail.com>"]
license = "MIT"
description = "p2p identify protocol"
Expand All @@ -10,7 +10,7 @@ categories = ["network-programming", "asynchronous"]
edition = "2018"

[dependencies]
p2p = { path = "../..", version = "0.2.0-alpha.13", package = "tentacle" }
p2p = { path = "../..", version = "0.2.0", package = "tentacle" }
bytes = "0.4"
flatbuffers = "0.6.0"
flatbuffers-verifier = "0.2.0"
Expand Down
4 changes: 2 additions & 2 deletions protocols/ping/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tentacle-ping"
version = "0.3.4"
version = "0.3.5"
authors = ["Nervos Core Dev <dev@nervos.org>"]
license = "MIT"
keywords = ["network", "peer-to-peer", "p2p", "ping"]
Expand All @@ -10,7 +10,7 @@ description = "ping protocol implementation for tentacle"
edition = "2018"

[dependencies]
p2p = { path = "../..", version = "0.2.0-alpha.13", package = "tentacle" }
p2p = { path = "../..", version = "0.2.0", package = "tentacle" }
log = "0.4"
flatbuffers = "0.6.0"
flatbuffers-verifier = "0.2.0"
Expand Down
32 changes: 29 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashMap, io, sync::Arc, time::Duration};

use tokio::codec::LengthDelimitedCodec;

use crate::{
Expand Down Expand Up @@ -130,6 +129,9 @@ pub(crate) type CodecFn = Box<Fn() -> Box<dyn Codec + Send + 'static> + Send + S
pub(crate) type SessionHandleFn =
Box<FnMut() -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> + Send>;
pub(crate) type SelectVersionFn = Box<dyn Fn() -> Option<SelectFn<String>> + Send + Sync + 'static>;
pub(crate) type BeforeReceiveFn = Box<dyn Fn() -> Option<BeforeReceive> + Send + Sync + 'static>;
pub(crate) type BeforeReceive =
Box<dyn Fn(bytes::BytesMut) -> Result<bytes::Bytes, io::Error> + Send + 'static>;

/// Builder for protocol meta
pub struct MetaBuilder {
Expand All @@ -140,6 +142,8 @@ pub struct MetaBuilder {
service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
session_handle: SessionHandleFn,
select_version: SelectVersionFn,
before_send: Option<Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,
Copy link
Member

@doitian doitian May 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the choice here that before_send is an Option, but before_receive is a Box<Fn>?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the message has a unified entry, only one fn can be used.

The received is that each protocol in each session has an entry and there is no aggregation, so here need to have a handler that can always generate the exact same function, fn -> Option<fn>.

Each time a protocol open, get a receive fn to it's stream

before_receive: BeforeReceiveFn,
}

impl MetaBuilder {
Expand Down Expand Up @@ -220,6 +224,24 @@ impl MetaBuilder {
self
}

/// Unified processing of messages before they are sent
pub fn before_send<T>(mut self, f: T) -> Self
where
T: Fn(bytes::Bytes) -> bytes::Bytes + 'static + Send,
{
self.before_send = Some(Box::new(f));
self
}

/// Unified processing of messages before user received
pub fn before_receive<T>(mut self, f: T) -> Self
where
T: Fn() -> Option<BeforeReceive> + Send + Sync + 'static,
{
self.before_receive = Box::new(f);
self
}

/// Combine the configuration of this builder to create a ProtocolMeta
pub fn build(self) -> ProtocolMeta {
let meta = Meta {
Expand All @@ -228,11 +250,13 @@ impl MetaBuilder {
support_versions: self.support_versions,
codec: self.codec,
select_version: self.select_version,
before_receive: self.before_receive,
};
ProtocolMeta {
inner: Arc::new(meta),
service_handle: self.service_handle,
session_handle: self.session_handle,
before_send: self.before_send,
}
}
}
Expand All @@ -247,6 +271,8 @@ impl Default for MetaBuilder {
service_handle: ProtocolHandle::Neither,
session_handle: Box::new(|| ProtocolHandle::Neither),
select_version: Box::new(|| None),
before_send: None,
before_receive: Box::new(|| None),
}
}
}
38 changes: 26 additions & 12 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) const BUF_SHRINK_THRESHOLD: usize = u8::max_value() as usize;
/// Received from user, aggregate mode
pub(crate) const RECEIVED_BUFFER_SIZE: usize = 2048;
/// Use to receive open/close event, no need too large
pub(crate) const RECEIVED_SIZE: usize = 128;
pub(crate) const RECEIVED_SIZE: usize = 512;
/// Send to remote, distribute mode
pub(crate) const SEND_SIZE: usize = 512;
pub(crate) const DELAY_TIME: Duration = Duration::from_millis(300);
Expand Down Expand Up @@ -79,6 +79,8 @@ pub struct Service<T> {

next_session: SessionId,

before_sends: HashMap<ProtocolId, Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,

/// Can be upgrade to list service level protocols
handle: T,
/// The buffer will be prioritized for distribution to session
Expand Down Expand Up @@ -148,6 +150,7 @@ where

Service {
protocol_configs,
before_sends: HashMap::default(),
handle,
multi_transport: MultiTransport::new(config.timeout),
future_task_sender,
Expand Down Expand Up @@ -1121,16 +1124,19 @@ where
self.send_future_task(Box::new(task))
}

fn start_service_proto_handles(&mut self) {
fn init_proto_handles(&mut self) {
let ids = self
.protocol_configs
.values()
.map(ProtocolMeta::id)
.collect::<Vec<ProtocolId>>();
for id in ids {
.values_mut()
.map(|meta| (meta.id(), meta.before_send.take()))
.collect::<Vec<(ProtocolId, _)>>();
for (id, before_send) in ids {
if let Some(handle) = self.proto_handle(false, id) {
self.handle_open(handle, id, None);
}
if let Some(function) = before_send {
self.before_sends.insert(id, function);
}
}
}

Expand Down Expand Up @@ -1290,11 +1296,19 @@ where
proto_id,
priority,
data,
} => match target {
TargetSession::Single(id) => self.send_message_to(id, proto_id, priority, data),
TargetSession::Multi(ids) => self.filter_broadcast(ids, proto_id, priority, data),
TargetSession::All => self.broadcast(proto_id, priority, data),
},
} => {
let data = match self.before_sends.get(&proto_id) {
Some(function) => function(data),
None => data,
};
match target {
TargetSession::Single(id) => self.send_message_to(id, proto_id, priority, data),
TargetSession::Multi(ids) => {
self.filter_broadcast(ids, proto_id, priority, data)
}
TargetSession::All => self.broadcast(proto_id, priority, data),
}
}
ServiceTask::Dial { address, target } => {
if !self.dial_protocols.contains_key(&address) {
if let Err(e) = self.dial_inner(address.clone(), target) {
Expand Down Expand Up @@ -1603,7 +1617,7 @@ where
if let Some(stream) = self.future_task_manager.take() {
tokio::spawn(stream.for_each(|_| Ok(())));
self.notify_queue();
self.start_service_proto_handles();
self.init_proto_handles();
}

if !self.write_buf.is_empty()
Expand Down
4 changes: 3 additions & 1 deletion src/service/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
builder::{CodecFn, NameFn, SelectVersionFn, SessionHandleFn},
builder::{BeforeReceiveFn, CodecFn, NameFn, SelectVersionFn, SessionHandleFn},
traits::{Codec, ServiceProtocol, SessionProtocol},
yamux::config::Config as YamuxConfig,
ProtocolId, SessionId,
Expand Down Expand Up @@ -56,6 +56,7 @@ pub struct ProtocolMeta {
pub(crate) inner: Arc<Meta>,
pub(crate) service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
pub(crate) session_handle: SessionHandleFn,
pub(crate) before_send: Option<Box<dyn Fn(bytes::Bytes) -> bytes::Bytes + Send + 'static>>,
}

impl ProtocolMeta {
Expand Down Expand Up @@ -122,6 +123,7 @@ pub(crate) struct Meta {
pub(crate) support_versions: Vec<String>,
pub(crate) codec: CodecFn,
pub(crate) select_version: SelectVersionFn,
pub(crate) before_receive: BeforeReceiveFn,
}

/// Protocol handle
Expand Down
2 changes: 2 additions & 0 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ where
};

let proto_id = proto.id;
let before_receive_fn = (proto.before_receive)();
let raw_part = sub_stream.into_parts();
let mut part = FramedParts::new(raw_part.io, (proto.codec)());
// Replace buffered data
Expand All @@ -465,6 +466,7 @@ where
.session_proto_sender(self.session_proto_senders.remove(&proto_id))
.keep_buffer(self.keep_buffer)
.event(self.event.contains(&proto_id))
.before_receive(before_receive_fn)
.build(frame);

self.sub_streams
Expand Down
29 changes: 24 additions & 5 deletions src/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
};

use crate::{
builder::BeforeReceive,
context::SessionContext,
error::Error,
protocol_handle_stream::{ServiceProtocolEvent, SessionProtocolEvent},
Expand Down Expand Up @@ -99,6 +100,7 @@ pub(crate) struct SubStream<U> {

service_proto_sender: Option<mpsc::Sender<ServiceProtocolEvent>>,
session_proto_sender: Option<mpsc::Sender<SessionProtocolEvent>>,
before_receive: Option<BeforeReceive>,

/// Delay notify with abnormally poor machines
delay: Arc<AtomicBool>,
Expand Down Expand Up @@ -404,19 +406,28 @@ where
data.len()
);

let data = match self.before_receive {
Some(ref function) => match function(data) {
Ok(data) => data,
Err(err) => {
self.error_close(err);
return;
}
},
None => data.freeze(),
};

if self.service_proto_sender.is_some() {
self.service_proto_buf
.push_back(ServiceProtocolEvent::Received {
id: self.context.id,
data: data.clone().freeze(),
data: data.clone(),
})
}

if self.session_proto_sender.is_some() {
self.session_proto_buf
.push_back(SessionProtocolEvent::Received {
data: data.clone().freeze(),
})
.push_back(SessionProtocolEvent::Received { data: data.clone() })
}

self.distribute_to_user_level();
Expand All @@ -425,7 +436,7 @@ where
self.output_event(ProtocolEvent::Message {
id: self.id,
proto_id: self.proto_id,
data: data.freeze(),
data,
priority: Priority::Normal,
})
}
Expand Down Expand Up @@ -573,6 +584,7 @@ pub(crate) struct SubstreamBuilder {

service_proto_sender: Option<mpsc::Sender<ServiceProtocolEvent>>,
session_proto_sender: Option<mpsc::Sender<SessionProtocolEvent>>,
before_receive: Option<BeforeReceive>,

/// Send event to session
event_sender: mpsc::Sender<ProtocolEvent>,
Expand All @@ -591,6 +603,7 @@ impl SubstreamBuilder {
SubstreamBuilder {
service_proto_sender: None,
session_proto_sender: None,
before_receive: None,
event_receiver,
event_sender,
closed,
Expand Down Expand Up @@ -644,6 +657,11 @@ impl SubstreamBuilder {
self
}

pub fn before_receive(mut self, f: Option<BeforeReceive>) -> Self {
self.before_receive = f;
self
}

pub fn build<U>(self, sub_stream: Framed<StreamHandle, U>) -> SubStream<U>
where
U: Codec,
Expand All @@ -670,6 +688,7 @@ impl SubstreamBuilder {

service_proto_sender: self.service_proto_sender,
session_proto_sender: self.session_proto_sender,
before_receive: self.before_receive,

delay: Arc::new(AtomicBool::new(false)),

Expand Down
Loading