Skip to content

Commit

Permalink
Addressing libp2p#473 ... if I understood the ticket right, we want t…
Browse files Browse the repository at this point in the history
…o pass through whatever the application provides as a topic identifier, leaving hashing (or not hashing) up to the application.
  • Loading branch information
peat authored and rklaehn committed Jan 23, 2020
1 parent d8bafc1 commit 6ef6e32
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 104 deletions.
4 changes: 2 additions & 2 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let transport = libp2p::build_development_transport(local_key)?;

// Create a Floodsub topic
let floodsub_topic = floodsub::TopicBuilder::new("chat").build();
let floodsub_topic = floodsub::Topic::new("chat");

// We create a custom network behaviour that combines floodsub and mDNS.
// In the future, we want to improve libp2p to make this easier to do.
Expand Down Expand Up @@ -150,7 +150,7 @@ fn main() -> Result<(), Box<dyn Error>> {
task::block_on(future::poll_fn(move |cx: &mut Context| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()),
Poll::Ready(Some(line)) => swarm.floodsub.publish(floodsub_topic.id().to_owned(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break
}
Expand Down
39 changes: 19 additions & 20 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::{Topic, TopicHash};
use crate::topic::Topic;
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use futures::prelude::*;
Expand Down Expand Up @@ -52,7 +52,7 @@ pub struct Floodsub<TSubstream> {
/// List of peers the network is connected to, and the topics that they're subscribed to.
// TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
// opened substreams
connected_peers: HashMap<PeerId, SmallVec<[TopicHash; 8]>>,
connected_peers: HashMap<PeerId, SmallVec<[String; 8]>>,

// List of topics we're subscribed to. Necessary to filter out messages that we receive
// erroneously.
Expand Down Expand Up @@ -91,7 +91,7 @@ impl<TSubstream> Floodsub<TSubstream> {
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(),
topic: topic.id().to_owned(),
action: FloodsubSubscriptionAction::Subscribe,
}],
},
Expand All @@ -116,7 +116,7 @@ impl<TSubstream> Floodsub<TSubstream> {
///
/// Returns true if the subscription worked. Returns false if we were already subscribed.
pub fn subscribe(&mut self, topic: Topic) -> bool {
if self.subscribed_topics.iter().any(|t| t.hash() == topic.hash()) {
if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
return false;
}

Expand All @@ -126,7 +126,7 @@ impl<TSubstream> Floodsub<TSubstream> {
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(),
topic: topic.id().to_owned(),
action: FloodsubSubscriptionAction::Subscribe,
}],
},
Expand All @@ -139,12 +139,11 @@ impl<TSubstream> Floodsub<TSubstream> {

/// Unsubscribes from a topic.
///
/// Note that this only requires a `TopicHash` and not a full `Topic`.
/// Note that this only requires the topic name.
///
/// Returns true if we were subscribed to this topic.
pub fn unsubscribe(&mut self, topic: impl AsRef<TopicHash>) -> bool {
let topic = topic.as_ref();
let pos = match self.subscribed_topics.iter().position(|t| t.hash() == topic) {
pub fn unsubscribe(&mut self, topic: String) -> bool {
let pos = match self.subscribed_topics.iter().position(|t| t.id() == topic) {
Some(pos) => pos,
None => return false
};
Expand All @@ -168,40 +167,40 @@ impl<TSubstream> Floodsub<TSubstream> {
}

/// Publishes a message to the network, if we're subscribed to the topic only.
pub fn publish(&mut self, topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) {
pub fn publish(&mut self, topic: String, data: impl Into<Vec<u8>>) {
self.publish_many(iter::once(topic), data)
}

/// Publishes a message to the network, even if we're not subscribed to the topic.
pub fn publish_any(&mut self, topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) {
pub fn publish_any(&mut self, topic: String, data: impl Into<Vec<u8>>) {
self.publish_many_any(iter::once(topic), data)
}

/// Publishes a message with multiple topics to the network.
///
///
/// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) {
pub fn publish_many(&mut self, topic: impl IntoIterator<Item = String>, data: impl Into<Vec<u8>>) {
self.publish_many_inner(topic, data, true)
}

/// Publishes a message with multiple topics to the network, even if we're not subscribed to any of the topics.
pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) {
pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = String>, data: impl Into<Vec<u8>>) {
self.publish_many_inner(topic, data, false)
}

fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = String>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
let message = FloodsubMessage {
source: self.local_peer_id.clone(),
data: data.into(),
// If the sequence numbers are predictable, then an attacker could flood the network
// with packets with the predetermined sequence numbers and absorb our legitimate
// messages. We therefore use a random number.
sequence_number: rand::random::<[u8; 20]>().to_vec(),
topics: topic.into_iter().map(|t| t.into().clone()).collect(),
topics: topic.into_iter().map(|t| t.clone()).collect(),
};

let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u));
let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.id() == u));
if self_subscribed {
self.received.add(&message);
}
Expand Down Expand Up @@ -252,7 +251,7 @@ where
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(),
topic: topic.id().to_owned(),
action: FloodsubSubscriptionAction::Subscribe,
}],
},
Expand Down Expand Up @@ -323,7 +322,7 @@ where
}

// Add the message to be dispatched to the user.
if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.hash() == u)) {
if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t.id() == u)) {
let event = FloodsubEvent::Message(message.clone());
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
Expand Down Expand Up @@ -408,14 +407,14 @@ pub enum FloodsubEvent {
/// Remote that has subscribed.
peer_id: PeerId,
/// The topic it has subscribed to.
topic: TopicHash,
topic: String,
},

/// A remote unsubscribed from a topic.
Unsubscribed {
/// Remote that has unsubscribed.
peer_id: PeerId,
/// The topic it has subscribed from.
topic: TopicHash,
topic: String,
},
}
2 changes: 1 addition & 1 deletion protocols/floodsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ mod rpc_proto {

pub use self::layer::{Floodsub, FloodsubEvent};
pub use self::protocol::{FloodsubMessage, FloodsubRpc};
pub use self::topic::{Topic, TopicBuilder, TopicHash};
pub use self::topic::Topic;
13 changes: 5 additions & 8 deletions protocols/floodsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
// DEALINGS IN THE SOFTWARE.

use crate::rpc_proto;
use crate::topic::TopicHash;
use futures::prelude::*;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade};
use prost::Message;
use std::{error, fmt, io, iter, pin::Pin};
use futures::{Future, io::{AsyncRead, AsyncWrite}};

/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -68,7 +67,6 @@ where
sequence_number: publish.seqno.unwrap_or_default(),
topics: publish.topic_ids
.into_iter()
.map(TopicHash::from_raw)
.collect(),
});
}
Expand All @@ -83,7 +81,7 @@ where
} else {
FloodsubSubscriptionAction::Unsubscribe
},
topic: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
topic: sub.topic_id.unwrap_or_default(),
})
.collect(),
})
Expand Down Expand Up @@ -184,7 +182,6 @@ impl FloodsubRpc {
seqno: Some(msg.sequence_number),
topic_ids: msg.topics
.into_iter()
.map(TopicHash::into_string)
.collect()
}
})
Expand All @@ -194,7 +191,7 @@ impl FloodsubRpc {
.map(|topic| {
rpc_proto::rpc::SubOpts {
subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe),
topic_id: Some(topic.topic.into_string())
topic_id: Some(topic.topic)
}
})
.collect()
Expand All @@ -221,7 +218,7 @@ pub struct FloodsubMessage {
/// List of topics this message belongs to.
///
/// Each message can belong to multiple topics at once.
pub topics: Vec<TopicHash>,
pub topics: Vec<String>,
}

/// A subscription received by the floodsub system.
Expand All @@ -230,7 +227,7 @@ pub struct FloodsubSubscription {
/// Action to perform.
pub action: FloodsubSubscriptionAction,
/// The topic from which to subscribe or unsubscribe.
pub topic: TopicHash,
pub topic: String,
}

/// Action that a subscription wants to perform.
Expand Down
83 changes: 10 additions & 73 deletions protocols/floodsub/src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,93 +18,30 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bs58;
use crate::rpc_proto;
use prost::Message;

/// Represents the hash of a topic.
///
/// Instead of a using the topic as a whole, the API of floodsub uses a hash of the topic. You only
/// have to build the hash once, then use it everywhere.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicHash {
hash: String,
}

impl TopicHash {
/// Builds a new `TopicHash` from the given hash.
pub fn from_raw(hash: String) -> TopicHash {
TopicHash { hash }
}

pub fn into_string(self) -> String {
self.hash
}
}

/// Built topic.
#[derive(Debug, Clone)]
pub struct Topic {
descriptor: rpc_proto::TopicDescriptor,
hash: TopicHash,
}

impl Topic {
/// Returns the hash of the topic.
pub fn hash(&self) -> &TopicHash {
&self.hash
}
}

impl AsRef<TopicHash> for Topic {
fn as_ref(&self) -> &TopicHash {
&self.hash
}
}

impl From<Topic> for TopicHash {
fn from(topic: Topic) -> TopicHash {
topic.hash
}
}

impl<'a> From<&'a Topic> for TopicHash {
fn from(topic: &'a Topic) -> TopicHash {
topic.hash.clone()
/// Returns the id of the topic.
#[inline]
pub fn id(&self) -> &str {
&self.descriptor.name.as_deref().unwrap_or_default()
}
}

/// Builder for a `TopicHash`.
#[derive(Debug, Clone)]
pub struct TopicBuilder {
builder: rpc_proto::TopicDescriptor,
}

impl TopicBuilder {
pub fn new<S>(name: S) -> TopicBuilder
pub fn new<S>(name: S) -> Topic
where
S: Into<String>,
{
TopicBuilder {
builder: rpc_proto::TopicDescriptor {
name: Some(name.into()),
auth: None,
enc: None
}
}
}

/// Turns the builder into an actual `Topic`.
pub fn build(self) -> Topic {
let mut buf = Vec::with_capacity(self.builder.encoded_len());
self.builder.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
// TODO: https://github.com/libp2p/rust-libp2p/issues/473
let hash = TopicHash {
hash: bs58::encode(&buf).into_string(),
let descriptor = rpc_proto::TopicDescriptor {
name: Some(name.into()),
enc: None,
auth: None,
};
Topic {
descriptor: self.builder,
hash,
}
Topic { descriptor }
}
}

0 comments on commit 6ef6e32

Please sign in to comment.