Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): remove fast_message_id_fn #4285

Merged
2 changes: 2 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.46.0 - unreleased

- Remove `fast_message_id_fn` mechanism from `Config`.
See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285).
- Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`.
See [PR 4642](https://github.com/libp2p/rust-libp2p/pull/4642).
- Return typed error from config builder.
Expand Down
58 changes: 9 additions & 49 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
use crate::time_cache::{DuplicateCache, TimeCache};
use crate::time_cache::DuplicateCache;
use crate::topic::{Hasher, Topic, TopicHash};
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::{
ControlAction, FastMessageId, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage,
Subscription, SubscriptionAction,
ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription,
SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, Rpc};
use crate::{rpc_proto::proto, TopicScoreParams};
Expand Down Expand Up @@ -323,9 +323,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// our own messages back if the messages are anonymous or use a random author.
published_message_ids: DuplicateCache<MessageId>,

/// Short term cache for fast message ids mapping them to the real message ids
fast_message_id_cache: TimeCache<FastMessageId, MessageId>,

/// The filter used to handle message subscriptions.
subscription_filter: F,

Expand Down Expand Up @@ -446,7 +443,6 @@ where
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
peer_topics: HashMap::new(),
explicit_peers: HashSet::new(),
Expand Down Expand Up @@ -1755,31 +1751,6 @@ where
metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
}

let fast_message_id = self.config.fast_message_id(&raw_message);

if let Some(fast_message_id) = fast_message_id.as_ref() {
if let Some(msg_id) = self.fast_message_id_cache.get(fast_message_id) {
let msg_id = msg_id.clone();
// Report the duplicate
if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.duplicated_message(
propagation_source,
&msg_id,
&raw_message.topic,
);
}
// Update the cache, informing that we have received a duplicate from another peer.
// The peers in this cache are used to prevent us forwarding redundant messages onto
// these peers.
self.mcache.observe_duplicate(&msg_id, propagation_source);
}

// This message has been seen previously. Ignore it
return;
}
}

// Try and perform the data transform to the message. If it fails, consider it invalid.
let message = match self.data_transform.inbound_transform(raw_message.clone()) {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
Ok(message) => message,
Expand All @@ -1805,14 +1776,6 @@ where
return;
}

// Add the message to the duplicate caches
if let Some(fast_message_id) = fast_message_id {
// add id to cache
self.fast_message_id_cache
.entry(fast_message_id)
.or_insert_with(|| msg_id.clone());
}

if !self.duplicate_cache.insert(msg_id.clone()) {
debug!("Message already received, ignoring. Message: {}", msg_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
Expand Down Expand Up @@ -1887,20 +1850,17 @@ where
metrics.register_invalid_message(&raw_message.topic);
}

let fast_message_id_cache = &self.fast_message_id_cache;
if let Ok(message) = self.data_transform.inbound_transform(raw_message.clone()) {
let message_id = self.config.message_id(&message);
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

if let Some(msg_id) = self
.config
.fast_message_id(raw_message)
.and_then(|id| fast_message_id_cache.get(&id))
{
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
&message_id,
&message.topic,
reject_reason,
);
gossip_promises.reject_message(msg_id, &reject_reason);

gossip_promises.reject_message(&message_id, &reject_reason);
} else {
// The message is invalid, we reject it ignoring any gossip promises. If a peer is
// advertising this message via an IHAVE and it's invalid it will be double
Expand Down
87 changes: 1 addition & 86 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ use super::*;
use crate::protocol::ProtocolConfig;
use crate::subscription_filter::WhitelistSubscriptionFilter;
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::FastMessageId;
use crate::ValidationError;
use crate::{
config::Config, config::ConfigBuilder, IdentTopic as Topic, Message, TopicScoreParams,
};
use crate::{config::Config, config::ConfigBuilder, IdentTopic as Topic, TopicScoreParams};
use async_std::net::Ipv4Addr;
use byteorder::{BigEndian, ByteOrder};
use libp2p_core::{ConnectedPoint, Endpoint};
use rand::Rng;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::thread::sleep;
use std::time::Duration;

Expand Down Expand Up @@ -5064,86 +5059,6 @@ fn test_public_api() {
);
}

#[test]
fn test_msg_id_fn_only_called_once_with_fast_message_ids() {
struct Pointers {
slow_counter: u32,
fast_counter: u32,
}

let mut counters = Pointers {
slow_counter: 0,
fast_counter: 0,
};

let counters_pointer: *mut Pointers = &mut counters;

let counters_address = counters_pointer as u64;

macro_rules! get_counters_pointer {
($m: expr) => {{
let mut address_bytes: [u8; 8] = Default::default();
address_bytes.copy_from_slice($m.as_slice());
let address = u64::from_be_bytes(address_bytes);
address as *mut Pointers
}};
}

macro_rules! get_counters_and_hash {
($m: expr) => {{
let mut hasher = DefaultHasher::new();
$m.hash(&mut hasher);
let id = hasher.finish().to_be_bytes().into();
(id, get_counters_pointer!($m))
}};
}

let message_id_fn = |m: &Message| -> MessageId {
let (mut id, counters_pointer): (MessageId, *mut Pointers) =
get_counters_and_hash!(&m.data);
unsafe {
(*counters_pointer).slow_counter += 1;
}
id.0.reverse();
id
};
let fast_message_id_fn = |m: &RawMessage| -> FastMessageId {
let (id, counters_pointer) = get_counters_and_hash!(&m.data);
unsafe {
(*counters_pointer).fast_counter += 1;
}
id
};
let config = ConfigBuilder::default()
.message_id_fn(message_id_fn)
.fast_message_id_fn(fast_message_id_fn)
.build()
.unwrap();
let (mut gs, _, topic_hashes) = inject_nodes1()
.peer_no(0)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
.gs_config(config)
.create_network();

let message = RawMessage {
source: None,
data: counters_address.to_be_bytes().to_vec(),
sequence_number: None,
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
};

for _ in 0..5 {
gs.handle_received_message(message.clone(), &PeerId::random());
}

assert_eq!(counters.fast_counter, 5);
assert_eq!(counters.slow_counter, 1);
}

#[test]
fn test_subscribe_to_invalid_topic() {
let t1 = Topic::new("t1");
Expand Down
34 changes: 1 addition & 33 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Duration;

use crate::error::ConfigBuilderError;
use crate::protocol::{ProtocolConfig, ProtocolId, FLOODSUB_PROTOCOL};
use crate::types::{FastMessageId, Message, MessageId, PeerKind, RawMessage};
use crate::types::{Message, MessageId, PeerKind};

use libp2p_identity::PeerId;
use libp2p_swarm::StreamProtocol;
Expand Down Expand Up @@ -78,7 +78,6 @@ pub struct Config {
duplicate_cache_time: Duration,
validate_messages: bool,
message_id_fn: Arc<dyn Fn(&Message) -> MessageId + Send + Sync + 'static>,
fast_message_id_fn: Option<Arc<dyn Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static>>,
allow_self_origin: bool,
do_px: bool,
prune_peers: usize,
Expand Down Expand Up @@ -218,20 +217,6 @@ impl Config {
(self.message_id_fn)(message)
}

/// A user-defined optional function that computes fast ids from raw messages. This can be used
/// to avoid possibly expensive transformations from [`RawMessage`] to
/// [`Message`] for duplicates. Two semantically different messages must always
/// have different fast message ids, but it is allowed that two semantically identical messages
/// have different fast message ids as long as the message_id_fn produces the same id for them.
///
/// The function takes a [`RawMessage`] as input and outputs a String to be
/// interpreted as the fast message id. Default is None.
pub fn fast_message_id(&self, message: &RawMessage) -> Option<FastMessageId> {
self.fast_message_id_fn
.as_ref()
.map(|fast_message_id_fn| fast_message_id_fn(message))
}

/// By default, gossipsub will reject messages that are sent to us that have the same message
/// source as we have specified locally. Enabling this, allows these messages and prevents
/// penalizing the peer that sent us the message. Default is false.
Expand Down Expand Up @@ -415,7 +400,6 @@ impl Default for ConfigBuilder {
.push_str(&message.sequence_number.unwrap_or_default().to_string());
MessageId::from(source_string)
}),
fast_message_id_fn: None,
allow_self_origin: false,
do_px: false,
prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented.
Expand Down Expand Up @@ -634,22 +618,6 @@ impl ConfigBuilder {
self
}

/// A user-defined optional function that computes fast ids from raw messages. This can be used
/// to avoid possibly expensive transformations from [`RawMessage`] to
/// [`Message`] for duplicates. Two semantically different messages must always
/// have different fast message ids, but it is allowed that two semantically identical messages
/// have different fast message ids as long as the message_id_fn produces the same id for them.
///
/// The function takes a [`Message`] as input and outputs a String to be interpreted
/// as the fast message id. Default is None.
pub fn fast_message_id_fn<F>(&mut self, fast_id_fn: F) -> &mut Self
where
F: Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static,
{
self.config.fast_message_id_fn = Some(Arc::new(fast_id_fn));
self
}

/// Enables Peer eXchange. This should be enabled in bootstrappers and other well
/// connected/trusted nodes. The default is false.
///
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub use self::subscription_filter::{
};
pub use self::topic::{Hasher, Topic, TopicHash};
pub use self::transform::{DataTransform, IdentityTransform};
pub use self::types::{FastMessageId, Message, MessageAcceptance, MessageId, RawMessage, Rpc};
pub use self::types::{Message, MessageAcceptance, MessageId, RawMessage, Rpc};

pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;
10 changes: 0 additions & 10 deletions protocols/gossipsub/src/time_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ impl<'a, K: 'a, V: 'a> Entry<'a, K, V>
where
K: Eq + std::hash::Hash + Clone,
{
pub(crate) fn or_insert_with<F: FnOnce() -> V>(self, default: F) -> &'a mut V {
match self {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(default()),
}
}
pub(crate) fn or_default(self) -> &'a mut V
where
V: Default,
Expand Down Expand Up @@ -159,10 +153,6 @@ where
pub(crate) fn contains_key(&self, key: &Key) -> bool {
self.map.contains_key(key)
}

pub(crate) fn get(&self, key: &Key) -> Option<&Value> {
self.map.get(key).map(|e| &e.element)
}
}

pub(crate) struct DuplicateCache<Key>(TimeCache<Key, ()>);
Expand Down
60 changes: 22 additions & 38 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,33 @@ pub enum MessageAcceptance {
Ignore,
}

/// Macro for declaring message id types
macro_rules! declare_message_id_type {
($name: ident, $name_string: expr) => {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct $name(pub Vec<u8>);

impl $name {
pub fn new(value: &[u8]) -> Self {
Self(value.to_vec())
}
}

impl<T: Into<Vec<u8>>> From<T> for $name {
fn from(value: T) -> Self {
Self(value.into())
}
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct MessageId(pub Vec<u8>);

impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex_fmt::HexFmt(&self.0))
}
}
impl MessageId {
pub fn new(value: &[u8]) -> Self {
Self(value.to_vec())
}
}

impl std::fmt::Debug for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}({})", $name_string, hex_fmt::HexFmt(&self.0))
}
}
};
impl<T: Into<Vec<u8>>> From<T> for MessageId {
fn from(value: T) -> Self {
Self(value.into())
}
}

// A type for gossipsub message ids.
declare_message_id_type!(MessageId, "MessageId");
impl std::fmt::Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex_fmt::HexFmt(&self.0))
}
}

// A type for gossipsub fast messsage ids, not to confuse with "real" message ids.
//
// A fast-message-id is an optional message_id that can be used to filter duplicates quickly. On
// high intensive networks with lots of messages, where the message_id is based on the result of
// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and
// filter duplicates quickly without performing the overhead of decompression.
declare_message_id_type!(FastMessageId, "FastMessageId");
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
impl std::fmt::Debug for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PeerConnections {
Expand Down