-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove futures_core::stream::Stream
from public API
#2910
Changes from 24 commits
6b308e4
9a27075
f5b440c
2bcf9b5
6e31754
181440d
f00d605
3e05c47
8e90987
492e8bf
2c7541b
f7e8a69
f3dae46
3aae470
3944bb3
fd70e49
ae047e5
50fe74c
ccf9d06
d31669b
f8c0c40
66420d5
d853cb1
5f65dff
6abb519
0491221
bedfb04
5698ee3
d9052d5
b281aee
d0452a0
0f5f6be
d2e4411
a778886
292a51b
ab57dce
3db0525
91163de
f5c8a96
35a1fca
852211f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,6 @@ | |
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
use async_stream::stream; | ||
use aws_sdk_transcribestreaming::config::{Credentials, Region}; | ||
use aws_sdk_transcribestreaming::error::SdkError; | ||
use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput; | ||
|
@@ -13,23 +12,29 @@ use aws_sdk_transcribestreaming::types::{ | |
AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream, | ||
}; | ||
use aws_sdk_transcribestreaming::{Client, Config}; | ||
use aws_smithy_async::future::fn_stream::FnStream; | ||
use aws_smithy_client::dvr::{Event, ReplayingConnection}; | ||
use aws_smithy_eventstream::frame::{DecodedFrame, HeaderValue, Message, MessageFrameDecoder}; | ||
use bytes::BufMut; | ||
use futures_core::Stream; | ||
use std::collections::{BTreeMap, BTreeSet}; | ||
use std::error::Error as StdError; | ||
|
||
const CHUNK_SIZE: usize = 8192; | ||
|
||
#[tokio::test] | ||
async fn test_success() { | ||
let input_stream = stream! { | ||
let pcm = pcm_data(); | ||
for chunk in pcm.chunks(CHUNK_SIZE) { | ||
yield Ok(AudioStream::AudioEvent(AudioEvent::builder().audio_chunk(Blob::new(chunk)).build())); | ||
} | ||
}; | ||
let input_stream = FnStream::new(|tx| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity. From https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/fn_stream/struct.FnStream.html:
Why is the channel 1-bounded? My understanding is that the user sends stream events to the channel and the client polls and dequeues them to send them through the network. What would be bad about allowing the channel to be unbounded? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FnStream is an implementation of generators–the network isn't involved in all use cases. By bounding it, you all the caller to avoid using more memory than is necessary |
||
Box::pin(async move { | ||
let pcm = pcm_data(); | ||
for chunk in pcm.chunks(CHUNK_SIZE) { | ||
tx.send(Ok(AudioStream::AudioEvent( | ||
AudioEvent::builder().audio_chunk(Blob::new(chunk)).build(), | ||
))) | ||
.await | ||
.expect("send should succeed"); | ||
} | ||
}) | ||
}); | ||
let (replayer, mut output) = | ||
start_request("us-west-2", include_str!("success.json"), input_stream).await; | ||
|
||
|
@@ -65,12 +70,18 @@ async fn test_success() { | |
|
||
#[tokio::test] | ||
async fn test_error() { | ||
let input_stream = stream! { | ||
let pcm = pcm_data(); | ||
for chunk in pcm.chunks(CHUNK_SIZE).take(1) { | ||
yield Ok(AudioStream::AudioEvent(AudioEvent::builder().audio_chunk(Blob::new(chunk)).build())); | ||
} | ||
}; | ||
let input_stream = FnStream::new(|tx| { | ||
Box::pin(async move { | ||
let pcm = pcm_data(); | ||
for chunk in pcm.chunks(CHUNK_SIZE).take(1) { | ||
tx.send(Ok(AudioStream::AudioEvent( | ||
AudioEvent::builder().audio_chunk(Blob::new(chunk)).build(), | ||
))) | ||
.await | ||
.expect("send should succeed"); | ||
} | ||
}) | ||
}); | ||
let (replayer, mut output) = | ||
start_request("us-east-1", include_str!("error.json"), input_stream).await; | ||
|
||
|
@@ -97,7 +108,7 @@ async fn test_error() { | |
async fn start_request( | ||
region: &'static str, | ||
events_json: &str, | ||
input_stream: impl Stream<Item = Result<AudioStream, AudioStreamError>> + Send + Sync + 'static, | ||
input_stream: FnStream<Result<AudioStream, AudioStreamError>>, | ||
) -> (ReplayingConnection, StartStreamTranscriptionOutput) { | ||
let events: Vec<Event> = serde_json::from_str(events_json).unwrap(); | ||
let replayer = ReplayingConnection::new(events); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,8 +141,9 @@ class PaginatorGenerator private constructor( | |
|
||
/// Create the pagination stream | ||
/// | ||
/// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)). | ||
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} { | ||
/// _Note:_ No requests will be dispatched until the stream is used | ||
/// (e.g. with [`.next().await`](aws_smithy_async::future::fn_stream::FnStream::next)). | ||
pub fn send(self) -> #{fn_stream}::FnStream<#{item_type}> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm...not sure I like this. It's pretty constraining and also exposes a weird name to customers. I think we should wrap this in a newtype so we aren't publicly exposing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We can create a smaller PR just to remove The above |
||
// Move individual fields out of self for the borrow checker | ||
let builder = self.builder; | ||
let handle = self.handle; | ||
|
@@ -257,10 +258,11 @@ class PaginatorGenerator private constructor( | |
impl ${paginatorName}Items { | ||
/// Create the pagination stream | ||
/// | ||
/// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._ | ||
/// _Note_: No requests will be dispatched until the stream is used | ||
/// (e.g. with [`.next().await`](aws_smithy_async::future::fn_stream::FnStream::next)). | ||
/// | ||
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](tokio_stream::StreamExt::collect). | ||
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} { | ||
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](aws_smithy_async::future::fn_stream::FnStream::collect). | ||
pub fn send(self) -> #{fn_stream}::FnStream<#{item_type}> { | ||
#{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter()) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn_steram
->fn_stream
.