Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream implementation (wrapper) for PaginationStream #3299

Merged
merged 10 commits into from
Jan 2, 2024

Conversation

Ploppz
Copy link
Contributor

@Ploppz Ploppz commented Dec 8, 2023

Motivation and Context

awslabs/aws-sdk-rust#995

Description

I tried to implement futures::Stream for a wrapper struct around PaginationStream. I am unsure if I did it in the best way. After fighting with the borrow checker for a while I decided to try Arc<Mutex<_>> - is this the way to go or does there exist a better way?
Even then, does the code look correct? I used it in my project and my integration tests do pass but I am not 100% that these tests will catch any error in paginated ListObjectsV2.

I would appreciate any feedback so far.

Testing

In progress while waiting for feedback on code

Checklist

  • I have updated CHANGELOG.next.toml if I made changes to the smithy-rs codegen or runtime crates

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@Ploppz Ploppz requested review from a team as code owners December 8, 2023 11:20
@rcoh
Copy link
Collaborator

rcoh commented Dec 8, 2023

for a number of reasons, I think that is the best you can do. Since you need to box the future, you then can't know for sure that the stream is alive long enough.

Luckily, there is a better way! We can expose poll_next on PaginationStream so that we can delegate to that. I've updated your branch with these changes.

I think there are a couple of remaining items we should do:

  1. Add an extension trait so you can do something like:
s3.list_buckets().into_paginator().send().as_stream_03x().map(...) // and other stream trait useful functions
  1. Add a changelog entry so folks know this new functionality is available!

@Ploppz
Copy link
Contributor Author

Ploppz commented Dec 11, 2023

Thanks! Some questions:

  • Why do you use tokio-stream rather than futures? I guess I will change the name of the module futures.rs
  • What is 03x in the function name?
  • I assume at this point the code is so simply tests are not needed

@rcoh
Copy link
Collaborator

rcoh commented Dec 11, 2023

Thanks! Some questions:

  • Why do you use tokio-stream rather than futures? I guess I will change the name of the module futures.rs
  • no specific reason—it's a re-export but has Tokio's StreamExt which has a lot of useful combinators.
  • What is 03x in the function name?
  • The stream trait is in futures core=0.3 — although this is a good point, it should probably match the version of the tokio-stream library, 0.1, so 01x. We use this naming convention on pre-1.0 libraries so that if another version of the library is released we can create a matching method.
  • I assume at this point the code is so simply tests are not needed
    Yeah. Although it couldn't hurt to validate that you read the same items with and without it, or even just validate that things compile as one would expect.

@Ploppz Ploppz changed the title DRAFT: futures::Stream implementation (wrapper) for PaginationStream Stream implementation (wrapper) for PaginationStream Dec 12, 2023
@Ploppz
Copy link
Contributor Author

Ploppz commented Dec 12, 2023

Does it look good now?

@Ploppz
Copy link
Contributor Author

Ploppz commented Dec 12, 2023

Btw, if tokio-stream re-exports futures::Stream and we don't even use StreamExt for anything, then that means we simply bring in more dependencies than necessary. Shouldn't we just use futures directly?

@abusch
Copy link
Contributor

abusch commented Dec 14, 2023

Nice, that would be useful to have! In the meantime we've done something pretty similar, which seems to work for us (plus an extension trait to add a .into_stream() method):

broken code
pub struct PaginationStreamAdapter<T>(PaginationStream<T>);

impl<T> futures::stream::Stream for PaginationStreamAdapter<T> {
    type Item = T;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let fut = self.0.next();
        pin_mut!(fut);
        fut.poll(cx)
    }
}

@Ploppz
Copy link
Contributor Author

Ploppz commented Dec 14, 2023

@abusch I'm not sure but isn't it wrong to call next() in every call to poll_next?
Even if it somehow works, it seems wrong to me. I might be wrong though but here's my line of thought:
The async system will keep polling to check the status of whether the next item is ready, and every time it does that, internally, your impl will call the next() function which I believe (these semantics would make sense to me) should be waited for completion until it's called the next time. If you are lucky, it's always ready when polled, maybe that's why it works for you?

@rcoh
Copy link
Collaborator

rcoh commented Dec 14, 2023 via email

@abusch
Copy link
Contributor

abusch commented Dec 14, 2023

Hmm, yeah, you might be right 😓

@rcoh
Copy link
Collaborator

rcoh commented Dec 14, 2023

yeah that's fair. In that case, I think we should use future_core stream directly: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html (futures::Stream) is just yet another re-export of this.

We should probably include a comment suggesting that tokio_stream is where most of the useful utilities live, though

Copy link
Collaborator

@rcoh rcoh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is looking great! just a couple of suggestions then we'll get this merged. Thanks for working on this, I think customers are really going to appreciate it.

rust-runtime/aws-smithy-types-convert/src/stream.rs Outdated Show resolved Hide resolved
CHANGELOG.next.toml Outdated Show resolved Hide resolved
@Ploppz Ploppz force-pushed the types-convert-stream branch from deea76e to 11f1e64 Compare December 14, 2023 15:31
@Ploppz
Copy link
Contributor Author

Ploppz commented Dec 14, 2023

I implemented the suggested changes, and rebased onto latest upstream main branch to fix conflicts. Also removed a trait bound that was unnecessary. Couldn't write very helpful code in the rustdoc unfortunately because we don't have a lot of dependencies to work with there.

CHANGELOG.next.toml Outdated Show resolved Hide resolved
Co-authored-by: Russell Cohen <russell.r.cohen@gmail.com>
CHANGELOG.next.toml Outdated Show resolved Hide resolved
CHANGELOG.next.toml Outdated Show resolved Hide resolved
@rcoh rcoh enabled auto-merge December 20, 2023 16:47
Add futures_core::stream to external types
@rcoh rcoh added this pull request to the merge queue Jan 2, 2024
Merged via the queue into smithy-lang:main with commit 7541fe7 Jan 2, 2024
38 of 39 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants