Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

IPC-39: Publish preemptive data #112

Merged
merged 20 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 112 additions & 10 deletions ipld/resolver/src/behaviour/membership.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: MIT
use std::collections::{HashSet, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
use std::time::Duration;

Expand Down Expand Up @@ -36,6 +36,8 @@ use super::NetworkConfig;
const PUBSUB_MEMBERSHIP: &str = "/ipc/membership";
/// `Gossipsub` topic identifier for voting about content.
const PUBSUB_VOTES: &str = "/ipc/ipld/votes";
/// `Gossipsub` topic identifier for pre-emptively published blocks of data.
const PUBSUB_PREEMPTIVE: &str = "/ipc/ipld/pre-emptive";

/// Events emitted by the [`membership::Behaviour`] behaviour.
#[derive(Debug)]
Expand All @@ -53,6 +55,9 @@ pub enum Event {

/// We received a [`VoteRecord`] in one of the subnets we are providing data for.
ReceivedVote(Box<VoteRecord>),

/// We received preemptive data published in a subnet we were interested in.
ReceivedPreemptive(SubnetID, Vec<u8>),
}

/// Configuration for [`membership::Behaviour`].
Expand Down Expand Up @@ -97,6 +102,8 @@ pub struct Behaviour {
subnet_ids: Vec<SubnetID>,
/// Voting topics we are currently subscribed to.
voting_topics: HashSet<TopicHash>,
/// Remember which subnet a topic was about.
preemptive_topics: HashMap<TopicHash, SubnetID>,
/// Caching the latest state of subnet providers.
provider_cache: SubnetProviderCache,
/// Interval between publishing the currently supported subnets.
Expand Down Expand Up @@ -153,21 +160,87 @@ impl Behaviour {
let mut interval = tokio::time::interval(mc.publish_interval);
interval.reset();

Ok(Self {
// Not passing static subnets here; using pinning below instead so it subscribes as well
let provider_cache = SubnetProviderCache::new(mc.max_subnets, vec![]);

let mut membership = Self {
inner: gossipsub,
outbox: Default::default(),
local_key: nc.local_key,
network_name: nc.network_name,
membership_topic,
subnet_ids: Default::default(),
voting_topics: Default::default(),
provider_cache: SubnetProviderCache::new(mc.max_subnets, mc.static_subnets),
preemptive_topics: Default::default(),
provider_cache,
publish_interval: interval,
min_time_between_publish: mc.min_time_between_publish,
last_publish_timestamp: Timestamp::default(),
next_publish_timestamp: Timestamp::now() + mc.publish_interval,
max_provider_age: mc.max_provider_age,
})
};

for subnet_id in mc.static_subnets {
membership.pin_subnet(subnet_id)?;
}

Ok(membership)
}

/// Construct the topic used to gossip about pre-emptively published data.
///
/// Replaces "/" with "_" to avoid clashes from prefix/suffix overlap.
fn preemptive_topic(&self, subnet_id: &SubnetID) -> Sha256Topic {
Topic::new(format!(
"{}/{}/{}",
PUBSUB_PREEMPTIVE,
self.network_name.replace('/', "_"),
subnet_id.to_string().replace('/', "_")
))
}

/// Subscribe to a preemptive topic.
fn preemptive_subscribe(&mut self, subnet_id: SubnetID) -> Result<(), SubscriptionError> {
let topic = self.preemptive_topic(&subnet_id);
self.inner.subscribe(&topic)?;
self.preemptive_topics.insert(topic.hash(), subnet_id);
Ok(())
}

/// Subscribe to a preemptive topic.
fn preemptive_unsubscribe(&mut self, subnet_id: &SubnetID) -> anyhow::Result<()> {
let topic = self.preemptive_topic(subnet_id);
self.inner.unsubscribe(&topic)?;
self.preemptive_topics.remove(&topic.hash());
Ok(())
}

/// Construct the topic used to gossip about votes.
///
/// Replaces "/" with "_" to avoid clashes from prefix/suffix overlap.
fn voting_topic(&self, subnet_id: &SubnetID) -> Sha256Topic {
Topic::new(format!(
"{}/{}/{}",
PUBSUB_VOTES,
self.network_name.replace('/', "_"),
subnet_id.to_string().replace('/', "_")
))
}

/// Subscribe to a voting topic.
fn voting_subscribe(&mut self, subnet_id: &SubnetID) -> Result<(), SubscriptionError> {
let topic = self.voting_topic(subnet_id);
self.inner.subscribe(&topic)?;
self.voting_topics.insert(topic.hash());
Ok(())
}

/// Unsubscribe from a voting topic.
fn voting_unsubscribe(&mut self, subnet_id: &SubnetID) -> anyhow::Result<()> {
let topic = self.voting_topic(subnet_id);
self.inner.unsubscribe(&topic)?;
self.voting_topics.remove(&topic.hash());
Ok(())
}

/// Construct the topic used to gossip about votes.
Expand Down Expand Up @@ -237,18 +310,23 @@ impl Behaviour {
self.publish_membership()
}

/// Make sure a subnet is not pruned.
/// Make sure a subnet is not pruned, so we always track its providers.
/// Also subscribe to pre-emptively published blocks of data.
///
/// This method could be called in a parent subnet when the ledger indicates
/// there is a known child subnet, so we make sure this subnet cannot be
/// crowded out during the initial phase of bootstrapping the network.
pub fn pin_subnet(&mut self, subnet_id: SubnetID) {
self.provider_cache.pin_subnet(subnet_id)
pub fn pin_subnet(&mut self, subnet_id: SubnetID) -> Result<(), SubscriptionError> {
self.preemptive_subscribe(subnet_id.clone())?;
self.provider_cache.pin_subnet(subnet_id);
Ok(())
}

/// Make a subnet pruneable.
pub fn unpin_subnet(&mut self, subnet_id: &SubnetID) {
self.provider_cache.unpin_subnet(subnet_id)
/// Make a subnet pruneable and unsubscribe from pre-emptive data.
pub fn unpin_subnet(&mut self, subnet_id: &SubnetID) -> anyhow::Result<()> {
self.preemptive_unsubscribe(subnet_id)?;
self.provider_cache.unpin_subnet(subnet_id);
Ok(())
}

/// Send a message through Gossipsub to let everyone know about the current configuration.
Expand Down Expand Up @@ -287,6 +365,23 @@ impl Behaviour {
}
}

/// Publish arbitrary data to the pre-emptive topic of a subnet.
///
/// We are not expected to be subscribed to this topic, only agents on the parent subnet are.
pub fn publish_preemptive(&mut self, subnet_id: SubnetID, data: Vec<u8>) -> anyhow::Result<()> {
let topic = self.preemptive_topic(&subnet_id);
match self.inner.publish(topic, data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
Ok(())
}
}
}

/// Mark a peer as routable in the cache.
///
/// Call this method when the discovery service learns the address of a peer.
Expand Down Expand Up @@ -339,6 +434,8 @@ impl Behaviour {
);
}
}
} else if let Some(subnet_id) = self.preemptive_topics.get(&msg.topic) {
self.handle_preemptive_data(subnet_id.clone(), msg.data)
} else {
stats::MEMBERSHIP_UNKNOWN_TOPIC.inc();
warn!(
Expand Down Expand Up @@ -380,6 +477,11 @@ impl Behaviour {
self.outbox.push_back(Event::ReceivedVote(Box::new(record)))
}

fn handle_preemptive_data(&mut self, subnet_id: SubnetID, data: Vec<u8>) {
self.outbox
.push_back(Event::ReceivedPreemptive(subnet_id, data))
}

/// Handle new subscribers to the membership topic.
fn handle_subscriber(&mut self, peer_id: PeerId, topic: TopicHash) {
if topic == self.membership_topic.hash() {
Expand Down
9 changes: 9 additions & 0 deletions ipld/resolver/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,17 @@ impl Client {
self.send_request(req)
}

/// Publish a signed vote into a topic based on its subnet.
pub fn publish_vote(&self, vote: SignedVoteRecord) -> anyhow::Result<()> {
let req = Request::PublishVote(Box::new(vote));
self.send_request(req)
}

/// Publish pre-emptively to a subnet that agents in the parent subnet
/// would be subscribed to if they are interested in receiving data
/// before they would have to use [`Client::resolve`] instead.
pub fn publish_preemptive(&self, subnet_id: SubnetID, data: Vec<u8>) -> anyhow::Result<()> {
let req = Request::PublishPreemptive(subnet_id, data);
self.send_request(req)
}
}
27 changes: 24 additions & 3 deletions ipld/resolver/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub(crate) enum Request {
AddProvidedSubnet(SubnetID),
RemoveProvidedSubnet(SubnetID),
PublishVote(Box<SignedVoteRecord>),
PublishPreemptive(SubnetID, Vec<u8>),
PinSubnet(SubnetID),
UnpinSubnet(SubnetID),
Resolve(Cid, SubnetID, ResponseChannel),
Expand All @@ -102,6 +103,8 @@ pub(crate) enum Request {
pub enum Event {
/// Received a vote about in a subnet about a CID.
ReceivedVote(Box<VoteRecord>),
/// Received raw pre-emptive data published to a pinned subnet.
ReceivedPreemptive(SubnetID, Vec<u8>),
}

/// The `Service` handles P2P communication to resolve IPLD content by wrapping and driving a number of `libp2p` behaviours.
Expand Down Expand Up @@ -360,6 +363,12 @@ impl<P: StoreParams> Service<P> {
debug!("dropped received vote because there are no subscribers")
}
}
membership::Event::ReceivedPreemptive(subnet_id, data) => {
let event = Event::ReceivedPreemptive(subnet_id, data);
if self.event_tx.send(event).is_err() {
debug!("dropped received preemptive data because there are no subscribers")
}
}
}
}

Expand Down Expand Up @@ -415,9 +424,21 @@ impl<P: StoreParams> Service<P> {
warn!("failed to publish vote: {e}")
}
}
Request::PinSubnet(id) => self.membership_mut().pin_subnet(id),
Request::UnpinSubnet(id) => self.membership_mut().unpin_subnet(&id),

Request::PublishPreemptive(subnet_id, data) => {
if let Err(e) = self.membership_mut().publish_preemptive(subnet_id, data) {
warn!("failed to publish pre-emptive data: {e}")
}
}
Request::PinSubnet(id) => {
if let Err(e) = self.membership_mut().pin_subnet(id) {
warn!("error pinning subnet: {e}")
}
}
Request::UnpinSubnet(id) => {
if let Err(e) = self.membership_mut().unpin_subnet(&id) {
warn!("error unpinning subnet: {e}")
}
}
Request::Resolve(cid, subnet_id, response_channel) => {
self.start_query(cid, subnet_id, response_channel)
}
Expand Down
59 changes: 57 additions & 2 deletions ipld/resolver/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,63 @@ async fn single_bootstrap_publish_receive_vote() {
.expect("timeout receiving vote")
.expect("error receiving vote");

let Event::ReceivedVote(v) = event;
assert_eq!(&*v, vote.record());
if let Event::ReceivedVote(v) = event {
assert_eq!(&*v, vote.record());
} else {
panic!("unexpected {event:?}")
}
}

/// Start two agents, pin a subnet, publish preemptively and receive.
#[tokio::test]
async fn single_bootstrap_publish_receive_preemptive() {
let _ = env_logger::builder().is_test(true).try_init();

// TODO: Get the seed from QuickCheck
let mut builder = ClusterBuilder::new(2);

// Build a cluster of nodes.
for i in 0..builder.size {
builder.add_node(if i == 0 { None } else { Some(0) });
}

// Start the swarms.
let mut cluster = builder.run();

// Wait a little for the cluster to connect.
// TODO: Wait on some condition instead of sleep.
tokio::time::sleep(Duration::from_secs(1)).await;

// Pin a subnet on the bootstrap node.
let subnet_id = make_subnet_id(1001);

cluster.agents[0]
.client
.pin_subnet(subnet_id.clone())
.expect("failed to pin subnet");

// TODO: Wait on some condition instead of sleep.
tokio::time::sleep(Duration::from_secs(1)).await;

// Publish some content from the other agent.
let data = vec![1, 2, 3];
cluster.agents[1]
.client
.publish_preemptive(subnet_id.clone(), data.clone())
.expect("failed to send vote");

// Receive pre-emptive data..
let event = timeout(Duration::from_secs(2), cluster.agents[0].events.recv())
.await
.expect("timeout receiving data")
.expect("error receiving data");

if let Event::ReceivedPreemptive(s, d) = event {
assert_eq!(s, subnet_id);
assert_eq!(d, data);
} else {
panic!("unexpected {event:?}")
}
}

fn make_service(config: Config) -> (Service<TestStoreParams>, TestBlockstore) {
Expand Down