From 6ef6e32aa8a0f9edad320bd62507c4f911eb0b8e Mon Sep 17 00:00:00 2001 From: Peat Bakke Date: Thu, 17 Oct 2019 23:26:05 -0700 Subject: [PATCH] Addressing #473 ... if I understood the ticket right, we want to pass through whatever the application provides as a topic identifier, leaving hashing (or not hashing) up to the application. --- examples/chat.rs | 4 +- protocols/floodsub/src/layer.rs | 39 +++++++------- protocols/floodsub/src/lib.rs | 2 +- protocols/floodsub/src/protocol.rs | 13 ++--- protocols/floodsub/src/topic.rs | 83 ++++-------------------------- 5 files changed, 37 insertions(+), 104 deletions(-) diff --git a/examples/chat.rs b/examples/chat.rs index 398c95a73fe0..23c48f574e0b 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -75,7 +75,7 @@ fn main() -> Result<(), Box> { let transport = libp2p::build_development_transport(local_key)?; // Create a Floodsub topic - let floodsub_topic = floodsub::TopicBuilder::new("chat").build(); + let floodsub_topic = floodsub::Topic::new("chat"); // We create a custom network behaviour that combines floodsub and mDNS. // In the future, we want to improve libp2p to make this easier to do. @@ -150,7 +150,7 @@ fn main() -> Result<(), Box> { task::block_on(future::poll_fn(move |cx: &mut Context| { loop { match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), + Poll::Ready(Some(line)) => swarm.floodsub.publish(floodsub_topic.id().to_owned(), line.as_bytes()), Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 929ce6806ecc..7323ed5882c8 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; -use crate::topic::{Topic, TopicHash}; +use crate::topic::Topic; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; use futures::prelude::*; @@ -52,7 +52,7 @@ pub struct Floodsub { /// List of peers the network is connected to, and the topics that they're subscribed to. // TODO: filter out peers that don't support floodsub, so that we avoid hammering them with // opened substreams - connected_peers: HashMap>, + connected_peers: HashMap>, // List of topics we're subscribed to. Necessary to filter out messages that we receive // erroneously. @@ -91,7 +91,7 @@ impl Floodsub { event: FloodsubRpc { messages: Vec::new(), subscriptions: vec![FloodsubSubscription { - topic: topic.hash().clone(), + topic: topic.id().to_owned(), action: FloodsubSubscriptionAction::Subscribe, }], }, @@ -116,7 +116,7 @@ impl Floodsub { /// /// Returns true if the subscription worked. Returns false if we were already subscribed. pub fn subscribe(&mut self, topic: Topic) -> bool { - if self.subscribed_topics.iter().any(|t| t.hash() == topic.hash()) { + if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) { return false; } @@ -126,7 +126,7 @@ impl Floodsub { event: FloodsubRpc { messages: Vec::new(), subscriptions: vec![FloodsubSubscription { - topic: topic.hash().clone(), + topic: topic.id().to_owned(), action: FloodsubSubscriptionAction::Subscribe, }], }, @@ -139,12 +139,11 @@ impl Floodsub { /// Unsubscribes from a topic. /// - /// Note that this only requires a `TopicHash` and not a full `Topic`. + /// Note that this only requires the topic name. /// /// Returns true if we were subscribed to this topic. - pub fn unsubscribe(&mut self, topic: impl AsRef) -> bool { - let topic = topic.as_ref(); - let pos = match self.subscribed_topics.iter().position(|t| t.hash() == topic) { + pub fn unsubscribe(&mut self, topic: String) -> bool { + let pos = match self.subscribed_topics.iter().position(|t| t.id() == topic) { Some(pos) => pos, None => return false }; @@ -168,12 +167,12 @@ impl Floodsub { } /// Publishes a message to the network, if we're subscribed to the topic only. - pub fn publish(&mut self, topic: impl Into, data: impl Into>) { + pub fn publish(&mut self, topic: String, data: impl Into>) { self.publish_many(iter::once(topic), data) } /// Publishes a message to the network, even if we're not subscribed to the topic. - pub fn publish_any(&mut self, topic: impl Into, data: impl Into>) { + pub fn publish_any(&mut self, topic: String, data: impl Into>) { self.publish_many_any(iter::once(topic), data) } @@ -181,16 +180,16 @@ impl Floodsub { /// /// /// > **Note**: Doesn't do anything if we're not subscribed to any of the topics. - pub fn publish_many(&mut self, topic: impl IntoIterator>, data: impl Into>) { + pub fn publish_many(&mut self, topic: impl IntoIterator, data: impl Into>) { self.publish_many_inner(topic, data, true) } /// Publishes a message with multiple topics to the network, even if we're not subscribed to any of the topics. - pub fn publish_many_any(&mut self, topic: impl IntoIterator>, data: impl Into>) { + pub fn publish_many_any(&mut self, topic: impl IntoIterator, data: impl Into>) { self.publish_many_inner(topic, data, false) } - fn publish_many_inner(&mut self, topic: impl IntoIterator>, data: impl Into>, check_self_subscriptions: bool) { + fn publish_many_inner(&mut self, topic: impl IntoIterator, data: impl Into>, check_self_subscriptions: bool) { let message = FloodsubMessage { source: self.local_peer_id.clone(), data: data.into(), @@ -198,10 +197,10 @@ impl Floodsub { // with packets with the predetermined sequence numbers and absorb our legitimate // messages. We therefore use a random number. sequence_number: rand::random::<[u8; 20]>().to_vec(), - topics: topic.into_iter().map(|t| t.into().clone()).collect(), + topics: topic.into_iter().map(|t| t.clone()).collect(), }; - let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)); + let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.id() == u)); if self_subscribed { self.received.add(&message); } @@ -252,7 +251,7 @@ where event: FloodsubRpc { messages: Vec::new(), subscriptions: vec![FloodsubSubscription { - topic: topic.hash().clone(), + topic: topic.id().to_owned(), action: FloodsubSubscriptionAction::Subscribe, }], }, @@ -323,7 +322,7 @@ where } // Add the message to be dispatched to the user. - if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) { + if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.id() == u)) { let event = FloodsubEvent::Message(message.clone()); self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } @@ -408,7 +407,7 @@ pub enum FloodsubEvent { /// Remote that has subscribed. peer_id: PeerId, /// The topic it has subscribed to. - topic: TopicHash, + topic: String, }, /// A remote unsubscribed from a topic. @@ -416,6 +415,6 @@ pub enum FloodsubEvent { /// Remote that has unsubscribed. peer_id: PeerId, /// The topic it has subscribed from. - topic: TopicHash, + topic: String, }, } diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index d83baf444896..cc9e840af792 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -32,4 +32,4 @@ mod rpc_proto { pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::protocol::{FloodsubMessage, FloodsubRpc}; -pub use self::topic::{Topic, TopicBuilder, TopicHash}; +pub use self::topic::Topic; diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index e8cec0cc5967..7e8924be7cde 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -19,11 +19,10 @@ // DEALINGS IN THE SOFTWARE. use crate::rpc_proto; -use crate::topic::TopicHash; -use futures::prelude::*; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade}; use prost::Message; use std::{error, fmt, io, iter, pin::Pin}; +use futures::{Future, io::{AsyncRead, AsyncWrite}}; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] @@ -68,7 +67,6 @@ where sequence_number: publish.seqno.unwrap_or_default(), topics: publish.topic_ids .into_iter() - .map(TopicHash::from_raw) .collect(), }); } @@ -83,7 +81,7 @@ where } else { FloodsubSubscriptionAction::Unsubscribe }, - topic: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), + topic: sub.topic_id.unwrap_or_default(), }) .collect(), }) @@ -184,7 +182,6 @@ impl FloodsubRpc { seqno: Some(msg.sequence_number), topic_ids: msg.topics .into_iter() - .map(TopicHash::into_string) .collect() } }) @@ -194,7 +191,7 @@ impl FloodsubRpc { .map(|topic| { rpc_proto::rpc::SubOpts { subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe), - topic_id: Some(topic.topic.into_string()) + topic_id: Some(topic.topic) } }) .collect() @@ -221,7 +218,7 @@ pub struct FloodsubMessage { /// List of topics this message belongs to. /// /// Each message can belong to multiple topics at once. - pub topics: Vec, + pub topics: Vec, } /// A subscription received by the floodsub system. @@ -230,7 +227,7 @@ pub struct FloodsubSubscription { /// Action to perform. pub action: FloodsubSubscriptionAction, /// The topic from which to subscribe or unsubscribe. - pub topic: TopicHash, + pub topic: String, } /// Action that a subscription wants to perform. diff --git a/protocols/floodsub/src/topic.rs b/protocols/floodsub/src/topic.rs index 75f6d8076434..0bf0c4ce6af0 100644 --- a/protocols/floodsub/src/topic.rs +++ b/protocols/floodsub/src/topic.rs @@ -18,93 +18,30 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bs58; use crate::rpc_proto; -use prost::Message; - -/// Represents the hash of a topic. -/// -/// Instead of a using the topic as a whole, the API of floodsub uses a hash of the topic. You only -/// have to build the hash once, then use it everywhere. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TopicHash { - hash: String, -} - -impl TopicHash { - /// Builds a new `TopicHash` from the given hash. - pub fn from_raw(hash: String) -> TopicHash { - TopicHash { hash } - } - - pub fn into_string(self) -> String { - self.hash - } -} /// Built topic. #[derive(Debug, Clone)] pub struct Topic { descriptor: rpc_proto::TopicDescriptor, - hash: TopicHash, } impl Topic { - /// Returns the hash of the topic. - pub fn hash(&self) -> &TopicHash { - &self.hash - } -} - -impl AsRef for Topic { - fn as_ref(&self) -> &TopicHash { - &self.hash - } -} - -impl From for TopicHash { - fn from(topic: Topic) -> TopicHash { - topic.hash - } -} - -impl<'a> From<&'a Topic> for TopicHash { - fn from(topic: &'a Topic) -> TopicHash { - topic.hash.clone() + /// Returns the id of the topic. + #[inline] + pub fn id(&self) -> &str { + &self.descriptor.name.as_deref().unwrap_or_default() } -} -/// Builder for a `TopicHash`. -#[derive(Debug, Clone)] -pub struct TopicBuilder { - builder: rpc_proto::TopicDescriptor, -} - -impl TopicBuilder { - pub fn new(name: S) -> TopicBuilder + pub fn new(name: S) -> Topic where S: Into, { - TopicBuilder { - builder: rpc_proto::TopicDescriptor { - name: Some(name.into()), - auth: None, - enc: None - } - } - } - - /// Turns the builder into an actual `Topic`. - pub fn build(self) -> Topic { - let mut buf = Vec::with_capacity(self.builder.encoded_len()); - self.builder.encode(&mut buf).expect("Vec provides capacity as needed"); - // TODO: https://github.com/libp2p/rust-libp2p/issues/473 - let hash = TopicHash { - hash: bs58::encode(&buf).into_string(), + let descriptor = rpc_proto::TopicDescriptor { + name: Some(name.into()), + enc: None, + auth: None, }; - Topic { - descriptor: self.builder, - hash, - } + Topic { descriptor } } }