Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid collecting all RecordStore records into a Vec in the behaviour #3021

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,7 @@ impl KademliaConfig {

impl<TStore> Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>,
TStore: Send + 'static,
TStore: RecordStore + Send + 'static,
{
/// Creates a new `Kademlia` network behaviour with a default configuration.
pub fn new(id: PeerId, store: TStore) -> Self {
Expand Down Expand Up @@ -688,10 +687,7 @@ where
if record.is_expired(Instant::now()) {
self.store.remove(&key)
} else {
records.push(PeerRecord {
peer: None,
record: record.into_owned(),
});
records.push(PeerRecord { peer: None, record });
}
}

Expand Down Expand Up @@ -1780,8 +1776,7 @@ fn exp_decrease(ttl: Duration, exp: u32) -> Duration {

impl<TStore> NetworkBehaviour for Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>,
TStore: Send + 'static,
TStore: RecordStore + Send + 'static,
{
type ConnectionHandler = KademliaHandlerProto<QueryId>;
type OutEvent = KademliaEvent;
Expand Down Expand Up @@ -2097,7 +2092,7 @@ where
self.store.remove(&key);
None
} else {
Some(record.into_owned())
Some(record)
}
}
None => None,
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ fn put_record() {
Ok(ok) => {
assert!(records.contains_key(&ok.key));
let record = swarm.behaviour_mut().store.get(&ok.key).unwrap();
results.push(record.into_owned());
results.push(record);
}
}
}
Expand Down
55 changes: 24 additions & 31 deletions protocols/kad/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
//! > out of the job to the consumer, where they can be dropped after being sent.

use crate::record::{self, store::RecordStore, ProviderRecord, Record};
use crate::store::{ProviderRecordsIter, RecordsIter};
use futures::prelude::*;
use futures_timer::Delay;
use instant::Instant;
Expand All @@ -70,7 +71,6 @@ use std::collections::HashSet;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::vec;

/// The maximum number of queries towards which background jobs
/// are allowed to start new queries on an invocation of
Expand Down Expand Up @@ -135,7 +135,7 @@ pub struct PutRecordJob {
publish_interval: Option<Duration>,
record_ttl: Option<Duration>,
skipped: HashSet<record::Key>,
inner: PeriodicJob<vec::IntoIter<Record>>,
inner: PeriodicJob<RecordsIter>,
}

impl PutRecordJob {
Expand Down Expand Up @@ -193,36 +193,33 @@ impl PutRecordJob {
/// to be run.
pub fn poll<T>(&mut self, cx: &mut Context<'_>, store: &mut T, now: Instant) -> Poll<Record>
where
for<'a> T: RecordStore<'a>,
T: RecordStore,
{
if self.inner.check_ready(cx, now) {
let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub);
let records = store
.records()
.filter_map(|r| {
let is_publisher = r.publisher.as_ref() == Some(&self.local_id);
if self.skipped.contains(&r.key) || (!publish && is_publisher) {
None
} else {
let mut record = r.into_owned();
if publish && is_publisher {
record.expires = record
.expires
.or_else(|| self.record_ttl.map(|ttl| now + ttl));
}
Some(record)
#[allow(clippy::mutable_key_type)]
let mut skipped = Default::default();
std::mem::swap(&mut skipped, &mut self.skipped);
let record_ttl = self.record_ttl;
let local_id = self.local_id;
let records = Box::new(store.records().filter_map(move |r| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store.records now returns an owned Iter. With the filter_map right after we might be cloning many items in vain. Isn't that an issue?

let is_publisher = r.publisher.as_ref() == Some(&local_id);
if skipped.contains(&r.key) || (!publish && is_publisher) {
None
} else {
let mut record = r;
if publish && is_publisher {
record.expires = record.expires.or_else(|| record_ttl.map(|ttl| now + ttl));
}
})
.collect::<Vec<_>>()
.into_iter();
Some(record)
}
}));

// Schedule the next publishing run.
if publish {
self.next_publish = self.publish_interval.map(|i| now + i);
}

self.skipped.clear();

self.inner.state = PeriodicJobState::Running(records);
}

Expand Down Expand Up @@ -251,7 +248,7 @@ impl PutRecordJob {

/// Periodic job for replicating provider records.
pub struct AddProviderJob {
inner: PeriodicJob<vec::IntoIter<ProviderRecord>>,
inner: PeriodicJob<ProviderRecordsIter>,
}

impl AddProviderJob {
Expand Down Expand Up @@ -294,14 +291,10 @@ impl AddProviderJob {
now: Instant,
) -> Poll<ProviderRecord>
where
for<'a> T: RecordStore<'a>,
T: RecordStore,
{
if self.inner.check_ready(cx, now) {
let records = store
.provided()
.map(|r| r.into_owned())
.collect::<Vec<_>>()
.into_iter();
let records = store.provided();
self.inner.state = PeriodicJobState::Running(records);
}

Expand Down Expand Up @@ -368,7 +361,7 @@ mod tests {
block_on(poll_fn(|ctx| {
let now = Instant::now() + job.inner.interval;
// All (non-expired) records in the store must be yielded by the job.
for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
for r in store.records().collect::<Vec<_>>() {
if !r.is_expired(now) {
assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
assert!(job.is_running());
Expand Down Expand Up @@ -398,7 +391,7 @@ mod tests {
block_on(poll_fn(|ctx| {
let now = Instant::now() + job.inner.interval;
// All (non-expired) records in the store must be yielded by the job.
for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
for r in store.provided().collect::<Vec<_>>() {
if !r.is_expired(now) {
assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
assert!(job.is_running());
Expand Down
25 changes: 12 additions & 13 deletions protocols/kad/src/record/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use thiserror::Error;

use super::*;
use crate::K_VALUE;
use std::borrow::Cow;

/// The result of an operation on a `RecordStore`.
pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -46,6 +45,9 @@ pub enum Error {
ValueTooLarge,
}

pub type RecordsIter = Box<dyn Iterator<Item = Record> + Send + Sync + 'static>;
pub type ProviderRecordsIter = Box<dyn Iterator<Item = ProviderRecord> + Send + Sync + 'static>;

/// Trait for types implementing a record store.
///
/// There are two types of records managed by a `RecordStore`:
Expand All @@ -64,36 +66,33 @@ pub enum Error {
/// content. Just like a regular record, a provider record is distributed
/// to the closest nodes to the key.
///
pub trait RecordStore<'a> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't 'a have been on the individual methods instead of the entire trait definition?

type RecordsIter: Iterator<Item = Cow<'a, Record>>;
type ProvidedIter: Iterator<Item = Cow<'a, ProviderRecord>>;

pub trait RecordStore {
/// Gets a record from the store, given its key.
fn get(&'a self, k: &Key) -> Option<Cow<'_, Record>>;
fn get(&self, k: &Key) -> Option<Record>;

/// Puts a record into the store.
fn put(&'a mut self, r: Record) -> Result<()>;
fn put(&mut self, r: Record) -> Result<()>;

/// Removes the record with the given key from the store.
fn remove(&'a mut self, k: &Key);
fn remove(&mut self, k: &Key);

/// Gets an iterator over all (value-) records currently stored.
fn records(&'a self) -> Self::RecordsIter;
fn records(&self) -> RecordsIter;

/// Adds a provider record to the store.
///
/// A record store only needs to store a number of provider records
/// for a key corresponding to the replication factor and should
/// store those records whose providers are closest to the key.
fn add_provider(&'a mut self, record: ProviderRecord) -> Result<()>;
fn add_provider(&mut self, record: ProviderRecord) -> Result<()>;

/// Gets a copy of the stored provider records for the given key.
fn providers(&'a self, key: &Key) -> Vec<ProviderRecord>;
fn providers(&self, key: &Key) -> Vec<ProviderRecord>;

/// Gets an iterator over all stored provider records for which the
/// node owning the store is itself the provider.
fn provided(&'a self) -> Self::ProvidedIter;
fn provided(&self) -> ProviderRecordsIter;

/// Removes a provider record from the store.
fn remove_provider(&'a mut self, k: &Key, p: &PeerId);
fn remove_provider(&mut self, k: &Key, p: &PeerId);
}
55 changes: 27 additions & 28 deletions protocols/kad/src/record/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use super::*;
use crate::kbucket;
use libp2p_core::PeerId;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::collections::{hash_map, hash_set, HashMap, HashSet};
use std::iter;
use std::collections::{hash_map, HashMap, HashSet};

/// In-memory implementation of a `RecordStore`.
pub struct MemoryStore {
Expand Down Expand Up @@ -96,20 +94,12 @@ impl MemoryStore {
}
}

impl<'a> RecordStore<'a> for MemoryStore {
type RecordsIter =
iter::Map<hash_map::Values<'a, Key, Record>, fn(&'a Record) -> Cow<'a, Record>>;

type ProvidedIter = iter::Map<
hash_set::Iter<'a, ProviderRecord>,
fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>,
>;

fn get(&'a self, k: &Key) -> Option<Cow<'_, Record>> {
self.records.get(k).map(Cow::Borrowed)
impl RecordStore for MemoryStore {
fn get(&self, k: &Key) -> Option<Record> {
self.records.get(k).cloned()
}

fn put(&'a mut self, r: Record) -> Result<()> {
fn put(&mut self, r: Record) -> Result<()> {
if r.value.len() >= self.config.max_value_bytes {
return Err(Error::ValueTooLarge);
}
Expand All @@ -131,15 +121,21 @@ impl<'a> RecordStore<'a> for MemoryStore {
Ok(())
}

fn remove(&'a mut self, k: &Key) {
fn remove(&mut self, k: &Key) {
self.records.remove(k);
}

fn records(&'a self) -> Self::RecordsIter {
self.records.values().map(Cow::Borrowed)
fn records(&self) -> RecordsIter {
Box::new(
self.records
.values()
.cloned()
.collect::<Vec<_>>()
.into_iter(),
)
}

fn add_provider(&'a mut self, record: ProviderRecord) -> Result<()> {
fn add_provider(&mut self, record: ProviderRecord) -> Result<()> {
let num_keys = self.providers.len();

// Obtain the entry
Expand Down Expand Up @@ -189,17 +185,23 @@ impl<'a> RecordStore<'a> for MemoryStore {
Ok(())
}

fn providers(&'a self, key: &Key) -> Vec<ProviderRecord> {
fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
self.providers
.get(key)
.map_or_else(Vec::new, |ps| ps.clone().into_vec())
}

fn provided(&'a self) -> Self::ProvidedIter {
self.provided.iter().map(Cow::Borrowed)
fn provided(&self) -> ProviderRecordsIter {
Box::new(
self.provided
.iter()
.cloned()
.collect::<Vec<_>>()
.into_iter(),
)
}

fn remove_provider(&'a mut self, key: &Key, provider: &PeerId) {
fn remove_provider(&mut self, key: &Key, provider: &PeerId) {
if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
let providers = e.get_mut();
if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
Expand Down Expand Up @@ -233,7 +235,7 @@ mod tests {
fn prop(r: Record) {
let mut store = MemoryStore::new(PeerId::random());
assert!(store.put(r.clone()).is_ok());
assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
assert_eq!(Some(r.clone()), store.get(&r.key));
store.remove(&r.key);
assert!(store.get(&r.key).is_none());
}
Expand Down Expand Up @@ -283,10 +285,7 @@ mod tests {
let key = random_multihash();
let rec = ProviderRecord::new(key, id, Vec::new());
assert!(store.add_provider(rec.clone()).is_ok());
assert_eq!(
vec![Cow::Borrowed(&rec)],
store.provided().collect::<Vec<_>>()
);
assert_eq!(vec![rec.clone()], store.provided().collect::<Vec<_>>());
store.remove_provider(&rec.key, &id);
assert_eq!(store.provided().count(), 0);
}
Expand Down