Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow users to customize the version select procedure #97

Merged
merged 1 commit into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use tokio::codec::LengthDelimitedCodec;

use crate::{
protocol_select::SelectFn,
secio::SecioKeyPair,
service::{
config::{Meta, ServiceConfig},
Expand Down Expand Up @@ -109,6 +110,7 @@ pub(crate) type NameFn = Box<Fn(ProtocolId) -> String + Send + Sync>;
pub(crate) type CodecFn = Box<Fn() -> Box<dyn Codec + Send + 'static> + Send + Sync>;
pub(crate) type SessionHandleFn =
Box<Fn(ProtocolId) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> + Send>;
pub(crate) type SelectVersionFn = Box<dyn Fn() -> Option<SelectFn<String>> + Send + Sync + 'static>;

/// Builder for protocol meta
pub struct MetaBuilder {
Expand All @@ -118,6 +120,7 @@ pub struct MetaBuilder {
codec: CodecFn,
service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
session_handle: SessionHandleFn,
select_version: SelectVersionFn,
}

impl MetaBuilder {
Expand Down Expand Up @@ -191,13 +194,23 @@ impl MetaBuilder {
self
}

/// Protocol version selection rule, default is [select_version](../protocol_select/fn.select_version.html)
pub fn select_version<T>(mut self, f: T) -> Self
where
T: Fn() -> Option<SelectFn<String>> + Send + Sync + 'static,
{
self.select_version = Box::new(f);
self
}

/// Combine the configuration of this builder to create a ProtocolMeta
pub fn build(self) -> ProtocolMeta {
let meta = Meta {
id: self.id,
name: self.name,
support_versions: self.support_versions,
codec: self.codec,
select_version: self.select_version,
};
ProtocolMeta {
inner: Arc::new(meta),
Expand All @@ -216,6 +229,7 @@ impl Default for MetaBuilder {
codec: Box::new(|| Box::new(LengthDelimitedCodec::new())),
service_handle: ProtocolHandle::Neither,
session_handle: Box::new(|_| ProtocolHandle::Neither),
select_version: Box::new(|| None),
}
}
}
35 changes: 24 additions & 11 deletions src/protocol_select/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ mod protocol_select_generated;
#[allow(dead_code)]
mod protocol_select_generated_verifier;

/// Function for protocol version select
pub type SelectFn<T> = Box<dyn Fn(&[T], &[T]) -> Option<T> + Send + 'static>;

/// Protocol Info
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ProtocolInfo {
Expand Down Expand Up @@ -140,7 +143,7 @@ pub(crate) fn client_select<T: AsyncWrite + AsyncRead + Send>(
/// plus the protocol name, plus the version option.
pub(crate) fn server_select<T: AsyncWrite + AsyncRead + Send>(
handle: T,
proto_infos: HashMap<String, ProtocolInfo>,
proto_infos: HashMap<String, (ProtocolInfo, Option<SelectFn<String>>)>,
) -> impl Future<Item = (Framed<T, LengthDelimitedCodec>, String, Option<String>), Error = io::Error>
{
let socket = Framed::new(handle, LengthDelimitedCodec::new());
Expand All @@ -167,14 +170,24 @@ pub(crate) fn server_select<T: AsyncWrite + AsyncRead + Send>(
return Err(err);
}
};
let version = proto_infos
.remove(&remote_info.name)
.and_then(|local_info| {
select_version(
&local_info.support_versions,
&remote_info.support_versions,
)
});
let version =
proto_infos
.remove(&remote_info.name)
.and_then(|(local_info, select)| {
select
.map(|f| {
f(
&local_info.support_versions,
&remote_info.support_versions,
)
})
.unwrap_or_else(|| {
select_version(
&local_info.support_versions,
&remote_info.support_versions,
)
})
});
Ok((socket, remote_info.name, version))
})
})
Expand All @@ -195,7 +208,7 @@ pub(crate) fn server_select<T: AsyncWrite + AsyncRead + Send>(

/// Choose the highest version of the two sides, assume that slices are sorted
#[inline]
fn select_version<T: Ord + Clone>(local: &[T], remote: &[T]) -> Option<T> {
pub fn select_version<T: Ord + Clone>(local: &[T], remote: &[T]) -> Option<T> {
let (mut local_iter, mut remote_iter) = (local.iter().rev(), remote.iter().rev());
let (mut local, mut remote) = (local_iter.next(), remote_iter.next());
while let (Some(l), Some(r)) = (local, remote) {
Expand Down Expand Up @@ -263,7 +276,7 @@ mod tests {
message.name = "test".to_owned();
message.support_versions = server;
let mut messages = HashMap::new();
messages.insert("test".to_owned(), message);
messages.insert("test".to_owned(), (message, None));

let task = server_select(connect.unwrap(), messages)
.map(|(_, _, a)| {
Expand Down
3 changes: 2 additions & 1 deletion src/service/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
builder::{CodecFn, NameFn, SessionHandleFn},
builder::{CodecFn, NameFn, SelectVersionFn, SessionHandleFn},
traits::{Codec, ServiceProtocol, SessionProtocol},
yamux::config::Config as YamuxConfig,
ProtocolId, SessionId,
Expand Down Expand Up @@ -120,6 +120,7 @@ pub(crate) struct Meta {
pub(crate) name: NameFn,
pub(crate) support_versions: Vec<String>,
pub(crate) codec: CodecFn,
pub(crate) select_version: SelectVersionFn,
}

/// Protocol handle
Expand Down
3 changes: 2 additions & 1 deletion src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ where
.map(|proto_meta| {
let name = (proto_meta.name)(proto_meta.id);
let proto_info = ProtocolInfo::new(&name, proto_meta.support_versions.clone());
(name, proto_info)
let select_fn = (proto_meta.select_version)();
(name, (proto_info, select_fn))
})
.collect();

Expand Down