Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove futures_core::stream::Stream from public API #2910

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6b308e4
Remove `futures_core::stream::Stream` from `smithy-async`
ysaito1001 Aug 8, 2023
9a27075
Remove `futures_core::stream::Stream` from `smithy-http`
ysaito1001 Aug 8, 2023
f5b440c
Remove `futures_core::stream::Stream` from `Paginator`
ysaito1001 Aug 9, 2023
2bcf9b5
Remove `futures_core::stream::Stream` from integration tests
ysaito1001 Aug 9, 2023
6e31754
Merge branch 'main' into ysaito/remove-futures-stream-from-public-api
ysaito1001 Aug 9, 2023
181440d
Fix links in docs
ysaito1001 Aug 9, 2023
f00d605
Fix codegen-server-test:test
ysaito1001 Aug 9, 2023
3e05c47
Fix tests in CI
ysaito1001 Aug 11, 2023
8e90987
Update pokemon example to use `FnStream`
ysaito1001 Aug 11, 2023
492e8bf
Fix more server tests
ysaito1001 Aug 11, 2023
2c7541b
Fix test name that checks both `Send` and `Sync`
ysaito1001 Aug 11, 2023
f7e8a69
Fix event-streaming tests in pokemon-service
ysaito1001 Aug 11, 2023
f3dae46
Let ser_* return concreate types instead of `impl Stream`
ysaito1001 Aug 11, 2023
3aae470
Port `event_stream_input_ergonomics` test
ysaito1001 Aug 11, 2023
3944bb3
Remove unused dependency from `aws-smithy-async`
ysaito1001 Aug 11, 2023
fd70e49
Update `pokemon-service-test` to use `FnStream`
ysaito1001 Aug 11, 2023
ae047e5
Remove unused `tokio_stream::StreamExt` from `canary-lambda`
ysaito1001 Aug 11, 2023
50fe74c
Tell udeps `futures_util` is used
ysaito1001 Aug 11, 2023
ccf9d06
Update latest for `canary-lambda`
ysaito1001 Aug 15, 2023
d31669b
Append `release-2023-08-03` to `NOTABLE_SDK_RELEASE_TAGS`
ysaito1001 Aug 16, 2023
f8c0c40
Add dependency on `aws-smithy-async`
ysaito1001 Aug 25, 2023
66420d5
Merge branch 'main' into ysaito/remove-futures-stream-from-public-api
ysaito1001 Aug 25, 2023
d853cb1
Remove 2023_01_26 that is more than 3 releases ago
ysaito1001 Aug 25, 2023
5f65dff
Update CHANGELOG.next.toml
ysaito1001 Aug 25, 2023
6abb519
Remove code duplication in calling `renderEventStreamBody`
ysaito1001 Aug 28, 2023
0491221
Remove uncecessary `CollectablePrivate`
ysaito1001 Aug 29, 2023
bedfb04
Use customization to wrap stream payload in new-type
ysaito1001 Aug 30, 2023
5698ee3
Let `finalize` take an owned associated collection
ysaito1001 Aug 30, 2023
d9052d5
Reexport `FnStream` and remove dependency on `aws-smithy-async`
ysaito1001 Aug 30, 2023
b281aee
Remove dependency on `aws-smithy-async` and use reexported `FnStream`
ysaito1001 Aug 30, 2023
d0452a0
Merge branch 'main' into ysaito/remove-futures-stream-from-public-api
ysaito1001 Aug 30, 2023
0f5f6be
Return size hint as is by `http_body::Body::size_hint`
ysaito1001 Aug 30, 2023
d2e4411
Add an comment `FnStream` is `Send` not `Sync`
ysaito1001 Aug 30, 2023
a778886
Add more comments as to why `generator` can be set to `None`
ysaito1001 Aug 30, 2023
292a51b
Merge branch 'main' into ysaito/remove-futures-stream-from-public-api
ysaito1001 Aug 30, 2023
ab57dce
Convert `StreamPayloadSerializer` to interface
ysaito1001 Aug 31, 2023
3db0525
Add comments to data class for `ServerHttpBoundProtocolSection`
ysaito1001 Aug 31, 2023
91163de
Use `ServiceShape.hasEventStreamOperations`
ysaito1001 Aug 31, 2023
f5c8a96
Move `hyper_body_wrap_stream` from inlineable to `aws-smithy-http`
ysaito1001 Sep 7, 2023
35a1fca
Merge branch 'main' into ysaito/remove-futures-stream-from-public-api
ysaito1001 Sep 8, 2023
852211f
Fix undeclared crate or module for feature-gated types
ysaito1001 Sep 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,21 @@ message = "Update MSRV to Rust 1.70.0"
references = ["smithy-rs#2948"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" }
author = "Velfi"

[[aws-sdk-rust]]
message = """
The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases, but it does require code upgrade for a small number of cases. The notable example is Transcribe streaming when streaming data is created via a `stream!` macro from the `async-stream` crate. The use of that macro needs to be replaced with `aws_smithy_async::future::fn_stream::FnStream`. See https://github.com/awslabs/smithy-rs/discussions/2952 for more details.
"""
references = ["smithy-rs#2910"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"

[[smithy-rs]]
message = """
The `futures_core::stream::Stream` trait has been removed from public API. The methods that were made available through the `Stream` trait have been removed from these types. However, we have preserved `.next()` and `.collect()` to continue supporting existing call sites in `smithy-rs` and `aws-sdk-rust`, including tests and rustdocs. If we need to support missing stream operations, we are planning to do so in an additive, backward compatible manner.

If your code uses a `stream!` macro from the `async_stream` crate to generate stream data, it needs to be replaced by `aws_smithy_async::future::fn_steram::FnStream`. See https://github.com/awslabs/smithy-rs/discussions/2952 for more details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn_steram -> fn_stream.

"""
references = ["smithy-rs#2910"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" }
author = "ysaito1001"
1 change: 0 additions & 1 deletion aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use bytes::Buf;
use bytes_utils::SegmentedBuf;
use http::header::HeaderName;
use ring::digest::{Context, Digest, SHA256};
use tokio_stream::StreamExt;

const TREE_HASH_HEADER: &str = "x-amz-sha256-tree-hash";
const X_AMZ_CONTENT_SHA256: &str = "x-amz-content-sha256";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class TranscribeTestDependencies : LibRsCustomization() {
override fun section(section: LibRsSection): Writable =
writable {
addDependency(AsyncStream)
addDependency(FuturesCore)
addDependency(FuturesCore.toDevDependency())
addDependency(Hound)
}
}
Expand Down
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/dynamodb/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
use std::collections::HashMap;
use std::iter::FromIterator;

use tokio_stream::StreamExt;

use aws_credential_types::Credentials;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::{Client, Config};
Expand Down
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/ec2/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

use tokio_stream::StreamExt;

use aws_sdk_ec2::{config::Credentials, config::Region, types::InstanceType, Client, Config};
use aws_smithy_client::http_connector::HttpConnector;
use aws_smithy_client::test_connection::TestConnection;
Expand Down
2 changes: 1 addition & 1 deletion aws/sdk/integration-tests/transcribestreaming/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ repository = "https://github.com/awslabs/smithy-rs"
publish = false

[dev-dependencies]
async-stream = "0.3.0"
aws-credential-types = { path = "../../build/aws-sdk/sdk/aws-credential-types", features = ["test-util"] }
aws-http = { path = "../../build/aws-sdk/sdk/aws-http" }
aws-sdk-transcribestreaming = { path = "../../build/aws-sdk/sdk/transcribestreaming" }
aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async" }
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util", "rustls"] }
aws-smithy-eventstream = { path = "../../build/aws-sdk/sdk/aws-smithy-eventstream" }
aws-smithy-http = { path = "../../build/aws-sdk/sdk/aws-smithy-http" }
Expand Down
41 changes: 26 additions & 15 deletions aws/sdk/integration-tests/transcribestreaming/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

use async_stream::stream;
use aws_sdk_transcribestreaming::config::{Credentials, Region};
use aws_sdk_transcribestreaming::error::SdkError;
use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput;
Expand All @@ -13,23 +12,29 @@ use aws_sdk_transcribestreaming::types::{
AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream,
};
use aws_sdk_transcribestreaming::{Client, Config};
use aws_smithy_async::future::fn_stream::FnStream;
use aws_smithy_client::dvr::{Event, ReplayingConnection};
use aws_smithy_eventstream::frame::{DecodedFrame, HeaderValue, Message, MessageFrameDecoder};
use bytes::BufMut;
use futures_core::Stream;
use std::collections::{BTreeMap, BTreeSet};
use std::error::Error as StdError;

const CHUNK_SIZE: usize = 8192;

#[tokio::test]
async fn test_success() {
let input_stream = stream! {
let pcm = pcm_data();
for chunk in pcm.chunks(CHUNK_SIZE) {
yield Ok(AudioStream::AudioEvent(AudioEvent::builder().audio_chunk(Blob::new(chunk)).build()));
}
};
let input_stream = FnStream::new(|tx| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity. From https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/fn_stream/struct.FnStream.html:

Because the stream is 1-bounded, the function will not proceed until the stream is read.

Why is the channel 1-bounded? My understanding is that the user sends stream events to the channel and the client polls and dequeues them to send them through the network. What would be bad about allowing the channel to be unbounded?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FnStream is an implementation of generators–the network isn't involved in all use cases. By bounding it, you all the caller to avoid using more memory than is necessary

Box::pin(async move {
let pcm = pcm_data();
for chunk in pcm.chunks(CHUNK_SIZE) {
tx.send(Ok(AudioStream::AudioEvent(
AudioEvent::builder().audio_chunk(Blob::new(chunk)).build(),
)))
.await
.expect("send should succeed");
}
})
});
let (replayer, mut output) =
start_request("us-west-2", include_str!("success.json"), input_stream).await;

Expand Down Expand Up @@ -65,12 +70,18 @@ async fn test_success() {

#[tokio::test]
async fn test_error() {
let input_stream = stream! {
let pcm = pcm_data();
for chunk in pcm.chunks(CHUNK_SIZE).take(1) {
yield Ok(AudioStream::AudioEvent(AudioEvent::builder().audio_chunk(Blob::new(chunk)).build()));
}
};
let input_stream = FnStream::new(|tx| {
Box::pin(async move {
let pcm = pcm_data();
for chunk in pcm.chunks(CHUNK_SIZE).take(1) {
tx.send(Ok(AudioStream::AudioEvent(
AudioEvent::builder().audio_chunk(Blob::new(chunk)).build(),
)))
.await
.expect("send should succeed");
}
})
});
let (replayer, mut output) =
start_request("us-east-1", include_str!("error.json"), input_stream).await;

Expand All @@ -97,7 +108,7 @@ async fn test_error() {
async fn start_request(
region: &'static str,
events_json: &str,
input_stream: impl Stream<Item = Result<AudioStream, AudioStreamError>> + Send + Sync + 'static,
input_stream: FnStream<Result<AudioStream, AudioStreamError>>,
) -> (ReplayingConnection, StartStreamTranscriptionOutput) {
let events: Vec<Event> = serde_json::from_str(events_json).unwrap();
let replayer = ReplayingConnection::new(events);
Expand Down
8 changes: 0 additions & 8 deletions aws/sdk/sdk-external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ allowed_external_types = [
"http::uri::Uri",
"http::method::Method",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Switch to AsyncIterator once standardized
"futures_core::stream::Stream",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `event-stream` feature
"aws_smithy_eventstream::*",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Decide if we want to continue exposing tower_layer
"tower_layer::Layer",
"tower_layer::identity::Identity",
"tower_layer::stack::Stack",
]
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ class PaginatorGenerator private constructor(

/// Create the pagination stream
///
/// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
/// _Note:_ No requests will be dispatched until the stream is used
/// (e.g. with [`.next().await`](aws_smithy_async::future::fn_stream::FnStream::next)).
pub fn send(self) -> #{fn_stream}::FnStream<#{item_type}> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm...not sure I like this. It's pretty constraining and also exposes a weird name to customers. I think we should wrap this in a newtype so we aren't publicly exposing the FnStream internals

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this PR much smaller to start piecing off individual chunks?

We can create a smaller PR just to remove impl Stream from aws-smithy-async and but keep it for event streaming (that'll take care of Burak & David's questions around FnStream since the code around event stream will be unchanged).

The above send methods needs to return a new-type hiding FnStream.

// Move individual fields out of self for the borrow checker
let builder = self.builder;
let handle = self.handle;
Expand Down Expand Up @@ -257,10 +258,11 @@ class PaginatorGenerator private constructor(
impl ${paginatorName}Items {
/// Create the pagination stream
///
/// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._
/// _Note_: No requests will be dispatched until the stream is used
/// (e.g. with [`.next().await`](aws_smithy_async::future::fn_stream::FnStream::next)).
///
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](tokio_stream::StreamExt::collect).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](aws_smithy_async::future::fn_stream::FnStream::collect).
pub fn send(self) -> #{fn_stream}::FnStream<#{item_type}> {
#{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ class FluentClientGenerator(
"""
/// Create a paginator for this request
///
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a `Stream`.
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns an [`FnStream`](aws_smithy_async::future::fn_stream::FnStream).
pub fn into_paginator(self) -> #{Paginator} {
#{Paginator}::new(self.handle, self.inner)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ class ClientHttpBoundProtocolPayloadGenerator(
_cfg.interceptor_state().store_put(signer_sender);
let adapter: #{aws_smithy_http}::event_stream::MessageStreamAdapter<_, _> =
${params.outerName}.${params.memberName}.into_body_stream(marshaller, error_marshaller, signer);
let body: #{SdkBody} = #{hyper}::Body::wrap_stream(adapter).into();
let body: #{SdkBody} = #{hyper}::Body::wrap_stream(#{HyperBodyWrapEventStream}::new(adapter)).into();
body
}
""",
"hyper" to CargoDependency.HyperWithStream.toType(),
"SdkBody" to RuntimeType.sdkBody(codegenContext.runtimeConfig),
"aws_smithy_http" to RuntimeType.smithyHttp(codegenContext.runtimeConfig),
"DeferredSigner" to RuntimeType.smithyEventStream(codegenContext.runtimeConfig).resolve("frame::DeferredSigner"),
"DeferredSigner" to RuntimeType.smithyEventStream(codegenContext.runtimeConfig)
.resolve("frame::DeferredSigner"),
"HyperBodyWrapEventStream" to RuntimeType.hyperBodyWrapStream(codegenContext.runtimeConfig)
.resolve("HyperBodyWrapEventStream"),
"marshallerConstructorFn" to params.marshallerConstructorFn,
"errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ class InlineDependency(
CargoDependency.smithyTypes(runtimeConfig),
)

fun hyperBodyWrapStream(runtimeConfig: RuntimeConfig): InlineDependency = forInlineableRustFile(
"hyper_body_wrap_stream",
CargoDependency.smithyHttp(runtimeConfig).withFeature("event-stream"),
CargoDependency.FuturesCore,
CargoDependency.smithyAsync(runtimeConfig).toDevDependency(),
CargoDependency.smithyEventStream(runtimeConfig).toDevDependency(),
)

fun constrained(): InlineDependency =
InlineDependency.forRustFile(ConstrainedModule, "/inlineable/src/constrained.rs")
}
Expand Down Expand Up @@ -227,6 +235,7 @@ data class CargoDependency(
val Bytes: CargoDependency = CargoDependency("bytes", CratesIo("1.0.0"))
val BytesUtils: CargoDependency = CargoDependency("bytes-utils", CratesIo("0.1.0"))
val FastRand: CargoDependency = CargoDependency("fastrand", CratesIo("2.0.0"))
val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.25"))
val Hex: CargoDependency = CargoDependency("hex", CratesIo("0.4.3"))
val Http: CargoDependency = CargoDependency("http", CratesIo("0.2.9"))
val HttpBody: CargoDependency = CargoDependency("http-body", CratesIo("0.4.4"))
Expand All @@ -246,7 +255,6 @@ data class CargoDependency(
val AsyncStd: CargoDependency = CargoDependency("async-std", CratesIo("1.12.0"), DependencyScope.Dev)
val AsyncStream: CargoDependency = CargoDependency("async-stream", CratesIo("0.3.0"), DependencyScope.Dev)
val Criterion: CargoDependency = CargoDependency("criterion", CratesIo("0.4.0"), DependencyScope.Dev)
val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.25"), DependencyScope.Dev)
val FuturesUtil: CargoDependency =
CargoDependency("futures-util", CratesIo("0.3.25"), DependencyScope.Dev, defaultFeatures = false)
val HdrHistogram: CargoDependency = CargoDependency("hdrhistogram", CratesIo("7.5.2"), DependencyScope.Dev)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,10 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
fun retryErrorKind(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("retry::ErrorKind")
fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::Receiver")

fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender")
fun hyperBodyWrapStream(runtimeConfig: RuntimeConfig): RuntimeType =
forInlineDependency(InlineDependency.hyperBodyWrapStream(runtimeConfig))

fun errorMetadata(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::ErrorMetadata")
fun errorMetadataBuilder(runtimeConfig: RuntimeConfig) =
Expand Down
Loading