Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): add send guards on collect #665

Merged
merged 4 commits into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/collections/binary_heap/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, IntoStream};

impl<T: Ord> stream::Extend<T> for BinaryHeap<T> {
impl<T: Ord + Send> stream::Extend<T> for BinaryHeap<T> {
fn extend<'a, S: IntoStream<Item = T> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

self.reserve(stream.size_hint().0);
Expand Down
7 changes: 5 additions & 2 deletions src/collections/binary_heap/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, FromStream, IntoStream};

impl<T: Ord> FromStream<T> for BinaryHeap<T> {
impl<T: Ord + Send> FromStream<T> for BinaryHeap<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
7 changes: 5 additions & 2 deletions src/collections/btree_map/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, IntoStream};

impl<K: Ord, V> stream::Extend<(K, V)> for BTreeMap<K, V> {
impl<K: Ord + Send, V: Send> stream::Extend<(K, V)> for BTreeMap<K, V> {
fn extend<'a, S: IntoStream<Item = (K, V)> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
Box::pin(stream.into_stream().for_each(move |(k, v)| {
self.insert(k, v);
}))
Expand Down
7 changes: 5 additions & 2 deletions src/collections/btree_map/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, FromStream, IntoStream};

impl<K: Ord, V> FromStream<(K, V)> for BTreeMap<K, V> {
impl<K: Ord + Send, V: Send> FromStream<(K, V)> for BTreeMap<K, V> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = (K, V)> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
7 changes: 5 additions & 2 deletions src/collections/btree_set/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, IntoStream};

impl<T: Ord> stream::Extend<T> for BTreeSet<T> {
impl<T: Ord + Send> stream::Extend<T> for BTreeSet<T> {
fn extend<'a, S: IntoStream<Item = T> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
Box::pin(stream.into_stream().for_each(move |item| {
self.insert(item);
}))
Expand Down
7 changes: 5 additions & 2 deletions src/collections/btree_set/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, FromStream, IntoStream};

impl<T: Ord> FromStream<T> for BTreeSet<T> {
impl<T: Ord + Send> FromStream<T> for BTreeSet<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
10 changes: 7 additions & 3 deletions src/collections/hash_map/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use crate::stream::{self, IntoStream};

impl<K, V, H> stream::Extend<(K, V)> for HashMap<K, V, H>
where
K: Eq + Hash,
H: BuildHasher + Default,
K: Eq + Hash + Send,
V: Send,
H: BuildHasher + Default + Send,
{
fn extend<'a, S: IntoStream<Item = (K, V)> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

// The following is adapted from the hashbrown source code:
Expand Down
10 changes: 7 additions & 3 deletions src/collections/hash_map/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use crate::stream::{self, FromStream, IntoStream};

impl<K, V, H> FromStream<(K, V)> for HashMap<K, V, H>
where
K: Eq + Hash,
H: BuildHasher + Default,
K: Eq + Hash + Send,
H: BuildHasher + Default + Send,
V: Send,
{
#[inline]
fn from_stream<'a, S: IntoStream<Item = (K, V)> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
9 changes: 6 additions & 3 deletions src/collections/hash_set/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use crate::stream::{self, IntoStream};

impl<T, H> stream::Extend<T> for HashSet<T, H>
where
T: Eq + Hash,
H: BuildHasher + Default,
T: Eq + Hash + Send,
H: BuildHasher + Default + Send,
{
fn extend<'a, S: IntoStream<Item = T> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
// The Extend impl for HashSet in the standard library delegates to the internal HashMap.
// Thus, this impl is just a copy of the async Extend impl for HashMap in this crate.

Expand Down
9 changes: 6 additions & 3 deletions src/collections/hash_set/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use crate::stream::{self, FromStream, IntoStream};

impl<T, H> FromStream<T> for HashSet<T, H>
where
T: Eq + Hash,
H: BuildHasher + Default,
T: Eq + Hash + Send,
H: BuildHasher + Default + Send,
{
#[inline]
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
7 changes: 5 additions & 2 deletions src/collections/linked_list/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, IntoStream};

impl<T> stream::Extend<T> for LinkedList<T> {
impl<T: Send> stream::Extend<T> for LinkedList<T> {
fn extend<'a, S: IntoStream<Item = T> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();
Box::pin(stream.for_each(move |item| self.push_back(item)))
}
Expand Down
7 changes: 5 additions & 2 deletions src/collections/linked_list/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, FromStream, IntoStream};

impl<T> FromStream<T> for LinkedList<T> {
impl<T: Send> FromStream<T> for LinkedList<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
7 changes: 5 additions & 2 deletions src/collections/vec_deque/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, IntoStream};

impl<T> stream::Extend<T> for VecDeque<T> {
impl<T: Send> stream::Extend<T> for VecDeque<T> {
fn extend<'a, S: IntoStream<Item = T> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

self.reserve(stream.size_hint().0);
Expand Down
7 changes: 5 additions & 2 deletions src/collections/vec_deque/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{self, FromStream, IntoStream};

impl<T> FromStream<T> for VecDeque<T> {
impl<T: Send> FromStream<T> for VecDeque<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
7 changes: 5 additions & 2 deletions src/option/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::prelude::*;
use crate::stream::{FromStream, IntoStream};
use std::convert::identity;

impl<T, V> FromStream<Option<T>> for Option<V>
impl<T: Send, V> FromStream<Option<T>> for Option<V>
where
V: FromStream<T>,
{
Expand All @@ -14,7 +14,10 @@ where
#[inline]
fn from_stream<'a, S: IntoStream<Item = Option<T>> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
12 changes: 9 additions & 3 deletions src/path/pathbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,10 @@ impl<P: AsRef<Path>> stream::Extend<P> for PathBuf {
fn extend<'a, S: IntoStream<Item = P> + 'a>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand All @@ -337,11 +340,14 @@ impl<P: AsRef<Path>> stream::Extend<P> for PathBuf {
}

#[cfg(feature = "unstable")]
impl<'b, P: AsRef<Path> + 'b> FromStream<P> for PathBuf {
impl<'b, P: AsRef<Path> + 'b + Send> FromStream<P> for PathBuf {
#[inline]
fn from_stream<'a, S: IntoStream<Item = P> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
7 changes: 6 additions & 1 deletion src/result/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::stream::{FromStream, IntoStream};

impl<T, E, V> FromStream<Result<T, E>> for Result<V, E>
where
T: Send,
E: Send,
V: FromStream<T>,
{
/// Takes each element in the stream: if it is an `Err`, no further
Expand All @@ -30,7 +32,10 @@ where
#[inline]
fn from_stream<'a, S: IntoStream<Item = Result<T, E>> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send,
{
let stream = stream.into_stream();

Box::pin(async move {
Expand Down
5 changes: 4 additions & 1 deletion src/stream/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ pub trait Extend<A> {
fn extend<'a, T: IntoStream<Item = A> + 'a>(
&'a mut self,
stream: T,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
<T as IntoStream>::IntoStream: Send;
}

/// Extends a collection with the contents of a stream.
Expand Down Expand Up @@ -69,6 +71,7 @@ pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S)
where
C: Extend<T>,
S: IntoStream<Item = T> + 'a,
<S as IntoStream>::IntoStream: Send,
{
Extend::extend(collection, stream).await
}
13 changes: 9 additions & 4 deletions src/stream/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ use crate::stream::IntoStream;
/// impl FromStream<i32> for MyCollection {
/// fn from_stream<'a, S: IntoStream<Item = i32> + 'a>(
/// stream: S,
/// ) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
/// ) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
/// where
/// <S as IntoStream>::IntoStream: Send,
/// {
/// let stream = stream.into_stream();
///
/// Box::pin(async move {
Expand Down Expand Up @@ -107,12 +110,12 @@ use crate::stream::IntoStream;
/// assert_eq!(c.0, vec![5, 5, 5, 5, 5]);
/// #
/// # Ok(()) }) }
///```
/// ```
///
/// [`IntoStream`]: trait.IntoStream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait FromStream<T> {
pub trait FromStream<T: Send> {
/// Creates a value from a stream.
///
/// # Examples
Expand All @@ -135,5 +138,7 @@ pub trait FromStream<T> {
/// ```
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>>;
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
where
<S as IntoStream>::IntoStream: Send;
}
5 changes: 3 additions & 2 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1888,10 +1888,11 @@ extension_trait! {
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn collect<'a, B>(
self,
) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a>>]
) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a + Send>>]
where
Self: Sized + 'a,
Self: Sized + 'a + Send,
B: FromStream<Self::Item>,
Self::Item: Send,
{
FromStream::from_stream(self)
}
Expand Down
Loading