From 3b50cbd1b8dc58b72717f8c9c1893ab791db5deb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Mon, 27 Jan 2020 15:23:01 +0100 Subject: [PATCH] =?UTF-8?q?Addressing=20#473=20...=20if=20I=20understood?= =?UTF-8?q?=20the=20ticket=20right,=20we=20want=20to=20pass=E2=80=A6=20(#1?= =?UTF-8?q?395)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Addressing #473 ... if I understood the ticket right, we want to pass through whatever the application provides as a topic identifier, leaving hashing (or not hashing) up to the application. * Remove TopicDescriptor and use Topic newtype everywhere * PR feedback Use From instead of Into Use impl Into instead of Topic in public API Co-authored-by: Peat Bakke --- examples/chat.rs | 4 +- protocols/floodsub/src/layer.rs | 43 +++++++------- protocols/floodsub/src/lib.rs | 2 +- protocols/floodsub/src/protocol.rs | 16 +++--- protocols/floodsub/src/rpc.proto | 29 ---------- protocols/floodsub/src/topic.rs | 90 ++++-------------------------- 6 files changed, 44 insertions(+), 140 deletions(-) diff --git a/examples/chat.rs b/examples/chat.rs index 398c95a73fe..4606b5f6ca6 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -75,7 +75,7 @@ fn main() -> Result<(), Box> { let transport = libp2p::build_development_transport(local_key)?; // Create a Floodsub topic - let floodsub_topic = floodsub::TopicBuilder::new("chat").build(); + let floodsub_topic = floodsub::Topic::new("chat"); // We create a custom network behaviour that combines floodsub and mDNS. // In the future, we want to improve libp2p to make this easier to do. @@ -150,7 +150,7 @@ fn main() -> Result<(), Box> { task::block_on(future::poll_fn(move |cx: &mut Context| { loop { match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), + Poll::Ready(Some(line)) => swarm.floodsub.publish(floodsub_topic.clone(), line.as_bytes()), Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 929ce6806ec..f9253f55474 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; -use crate::topic::{Topic, TopicHash}; +use crate::topic::Topic; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; use futures::prelude::*; @@ -52,7 +52,7 @@ pub struct Floodsub { /// List of peers the network is connected to, and the topics that they're subscribed to. // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with // opened substreams - connected_peers: HashMap>, + connected_peers: HashMap>, // List of topics we're subscribed to. Necessary to filter out messages that we receive // erroneously. @@ -85,13 +85,13 @@ impl Floodsub { pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) { // Send our topics to this node if we're already connected to it. if self.connected_peers.contains_key(&peer_id) { - for topic in self.subscribed_topics.iter() { + for topic in self.subscribed_topics.iter().cloned() { self.events.push_back(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), event: FloodsubRpc { messages: Vec::new(), subscriptions: vec![FloodsubSubscription { - topic: topic.hash().clone(), + topic, action: FloodsubSubscriptionAction::Subscribe, }], }, @@ -116,7 +116,7 @@ impl Floodsub { /// /// Returns true if the subscription worked. Returns false if we were already subscribed. pub fn subscribe(&mut self, topic: Topic) -> bool { - if self.subscribed_topics.iter().any(|t| t.hash() == topic.hash()) { + if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) { return false; } @@ -126,7 +126,7 @@ impl Floodsub { event: FloodsubRpc { messages: Vec::new(), subscriptions: vec![FloodsubSubscription { - topic: topic.hash().clone(), + topic: topic.clone(), action: FloodsubSubscriptionAction::Subscribe, }], }, @@ -139,12 +139,11 @@ impl Floodsub { /// Unsubscribes from a topic. /// - /// Note that this only requires a `TopicHash` and not a full `Topic`. + /// Note that this only requires the topic name. /// /// Returns true if we were subscribed to this topic. - pub fn unsubscribe(&mut self, topic: impl AsRef) -> bool { - let topic = topic.as_ref(); - let pos = match self.subscribed_topics.iter().position(|t| t.hash() == topic) { + pub fn unsubscribe(&mut self, topic: Topic) -> bool { + let pos = match self.subscribed_topics.iter().position(|t| *t == topic) { Some(pos) => pos, None => return false }; @@ -168,12 +167,12 @@ impl Floodsub { } /// Publishes a message to the network, if we're subscribed to the topic only. - pub fn publish(&mut self, topic: impl Into, data: impl Into>) { + pub fn publish(&mut self, topic: impl Into, data: impl Into>) { self.publish_many(iter::once(topic), data) } /// Publishes a message to the network, even if we're not subscribed to the topic. - pub fn publish_any(&mut self, topic: impl Into, data: impl Into>) { + pub fn publish_any(&mut self, topic: impl Into, data: impl Into>) { self.publish_many_any(iter::once(topic), data) } @@ -181,16 +180,16 @@ impl Floodsub { /// /// /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics. - pub fn publish_many(&mut self, topic: impl IntoIterator>, data: impl Into>) { + pub fn publish_many(&mut self, topic: impl IntoIterator>, data: impl Into>) { self.publish_many_inner(topic, data, true) } /// Publishes a message with multiple topics to the network, even if we're not subscribed to any of the topics. - pub fn publish_many_any(&mut self, topic: impl IntoIterator>, data: impl Into>) { + pub fn publish_many_any(&mut self, topic: impl IntoIterator>, data: impl Into>) { self.publish_many_inner(topic, data, false) } - fn publish_many_inner(&mut self, topic: impl IntoIterator>, data: impl Into>, check_self_subscriptions: bool) { + fn publish_many_inner(&mut self, topic: impl IntoIterator>, data: impl Into>, check_self_subscriptions: bool) { let message = FloodsubMessage { source: self.local_peer_id.clone(), data: data.into(), @@ -198,10 +197,10 @@ impl Floodsub { // with packets with the predetermined sequence numbers and absorb our legitimate // messages. We therefore use a random number. sequence_number: rand::random::<[u8; 20]>().to_vec(), - topics: topic.into_iter().map(|t| t.into().clone()).collect(), + topics: topic.into_iter().map(Into::into).collect(), }; - let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)); + let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)); if self_subscribed { self.received.add(&message); } @@ -246,13 +245,13 @@ where fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { // We need to send our subscriptions to the newly-connected node. if self.target_peers.contains(&id) { - for topic in self.subscribed_topics.iter() { + for topic in self.subscribed_topics.iter().cloned() { self.events.push_back(NetworkBehaviourAction::SendEvent { peer_id: id.clone(), event: FloodsubRpc { messages: Vec::new(), subscriptions: vec![FloodsubSubscription { - topic: topic.hash().clone(), + topic, action: FloodsubSubscriptionAction::Subscribe, }], }, @@ -323,7 +322,7 @@ where } // Add the message to be dispatched to the user. - if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) { + if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) { let event = FloodsubEvent::Message(message.clone()); self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } @@ -408,7 +407,7 @@ pub enum FloodsubEvent { /// Remote that has subscribed. peer_id: PeerId, /// The topic it has subscribed to. - topic: TopicHash, + topic: Topic, }, /// A remote unsubscribed from a topic. @@ -416,6 +415,6 @@ pub enum FloodsubEvent { /// Remote that has unsubscribed. peer_id: PeerId, /// The topic it has subscribed from. - topic: TopicHash, + topic: Topic, }, } diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index d83baf44489..cc9e840af79 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -32,4 +32,4 @@ mod rpc_proto { pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::protocol::{FloodsubMessage, FloodsubRpc}; -pub use self::topic::{Topic, TopicBuilder, TopicHash}; +pub use self::topic::Topic; diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index e8cec0cc596..4df3975eddb 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -19,11 +19,11 @@ // DEALINGS IN THE SOFTWARE. use crate::rpc_proto; -use crate::topic::TopicHash; -use futures::prelude::*; +use crate::topic::Topic; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade}; use prost::Message; use std::{error, fmt, io, iter, pin::Pin}; +use futures::{Future, io::{AsyncRead, AsyncWrite}}; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] @@ -68,7 +68,7 @@ where sequence_number: publish.seqno.unwrap_or_default(), topics: publish.topic_ids .into_iter() - .map(TopicHash::from_raw) + .map(Topic::new) .collect(), }); } @@ -83,7 +83,7 @@ where } else { FloodsubSubscriptionAction::Unsubscribe }, - topic: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), + topic: Topic::new(sub.topic_id.unwrap_or_default()), }) .collect(), }) @@ -184,7 +184,7 @@ impl FloodsubRpc { seqno: Some(msg.sequence_number), topic_ids: msg.topics .into_iter() - .map(TopicHash::into_string) + .map(|topic| topic.into()) .collect() } }) @@ -194,7 +194,7 @@ impl FloodsubRpc { .map(|topic| { rpc_proto::rpc::SubOpts { subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe), - topic_id: Some(topic.topic.into_string()) + topic_id: Some(topic.topic.into()) } }) .collect() @@ -221,7 +221,7 @@ pub struct FloodsubMessage { /// List of topics this message belongs to. /// /// Each message can belong to multiple topics at once. - pub topics: Vec, + pub topics: Vec, } /// A subscription received by the floodsub system. @@ -230,7 +230,7 @@ pub struct FloodsubSubscription { /// Action to perform. pub action: FloodsubSubscriptionAction, /// The topic from which to subscribe or unsubscribe. - pub topic: TopicHash, + pub topic: Topic, } /// Action that a subscription wants to perform. diff --git a/protocols/floodsub/src/rpc.proto b/protocols/floodsub/src/rpc.proto index 96f683df9fb..84f0ea51795 100644 --- a/protocols/floodsub/src/rpc.proto +++ b/protocols/floodsub/src/rpc.proto @@ -18,32 +18,3 @@ message Message { optional bytes seqno = 3; repeated string topic_ids = 4; } - -// topicID = hash(topicDescriptor); (not the topic.name) -message TopicDescriptor { - optional string name = 1; - optional AuthOpts auth = 2; - optional EncOpts enc = 3; - - message AuthOpts { - optional AuthMode mode = 1; - repeated bytes keys = 2; // root keys to trust - - enum AuthMode { - NONE = 0; // no authentication, anyone can publish - KEY = 1; // only messages signed by keys in the topic descriptor are accepted - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } - - message EncOpts { - optional EncMode mode = 1; - repeated bytes key_hashes = 2; // the hashes of the shared keys used (salted) - - enum EncMode { - NONE = 0; // no encryption, anyone can read - SHAREDKEY = 1; // messages are encrypted with shared key - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } -} diff --git a/protocols/floodsub/src/topic.rs b/protocols/floodsub/src/topic.rs index 75f6d807643..daca5637952 100644 --- a/protocols/floodsub/src/topic.rs +++ b/protocols/floodsub/src/topic.rs @@ -18,93 +18,27 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bs58; -use crate::rpc_proto; -use prost::Message; - -/// Represents the hash of a topic. -/// -/// Instead of a using the topic as a whole, the API of floodsub uses a hash of the topic. You only -/// have to build the hash once, then use it everywhere. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TopicHash { - hash: String, -} - -impl TopicHash { - /// Builds a new `TopicHash` from the given hash. - pub fn from_raw(hash: String) -> TopicHash { - TopicHash { hash } - } - - pub fn into_string(self) -> String { - self.hash - } -} - /// Built topic. -#[derive(Debug, Clone)] -pub struct Topic { - descriptor: rpc_proto::TopicDescriptor, - hash: TopicHash, -} +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Topic(String); impl Topic { - /// Returns the hash of the topic. - pub fn hash(&self) -> &TopicHash { - &self.hash - } -} - -impl AsRef for Topic { - fn as_ref(&self) -> &TopicHash { - &self.hash - } -} - -impl From for TopicHash { - fn from(topic: Topic) -> TopicHash { - topic.hash - } -} - -impl<'a> From<&'a Topic> for TopicHash { - fn from(topic: &'a Topic) -> TopicHash { - topic.hash.clone() + /// Returns the id of the topic. + #[inline] + pub fn id(&self) -> &str { + &self.0 } -} -/// Builder for a `TopicHash`. -#[derive(Debug, Clone)] -pub struct TopicBuilder { - builder: rpc_proto::TopicDescriptor, -} - -impl TopicBuilder { - pub fn new(name: S) -> TopicBuilder + pub fn new(name: S) -> Topic where S: Into, { - TopicBuilder { - builder: rpc_proto::TopicDescriptor { - name: Some(name.into()), - auth: None, - enc: None - } - } + Topic(name.into()) } +} - /// Turns the builder into an actual `Topic`. - pub fn build(self) -> Topic { - let mut buf = Vec::with_capacity(self.builder.encoded_len()); - self.builder.encode(&mut buf).expect("Vec provides capacity as needed"); - // TODO: https://github.com/libp2p/rust-libp2p/issues/473 - let hash = TopicHash { - hash: bs58::encode(&buf).into_string(), - }; - Topic { - descriptor: self.builder, - hash, - } +impl From for String { + fn from(topic: Topic) -> String { + topic.0 } }