-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deal with S3 download failures by streaming files #4928
Deal with S3 download failures by streaming files #4928
Conversation
private[client] class Fs2StreamAsyncResponseTransformer | ||
extends AsyncResponseTransformer[GetObjectResponse, Publisher[ByteBuffer]] { | ||
|
||
private val cf = new CompletableFuture[Publisher[ByteBuffer]]() | ||
override def prepare(): CompletableFuture[Publisher[ByteBuffer]] = cf | ||
|
||
override def onResponse(response: GetObjectResponse): Unit = () | ||
|
||
override def onStream(publisher: SdkPublisher[ByteBuffer]): Unit = { | ||
cf.complete(publisher) | ||
() | ||
} | ||
|
||
override def exceptionOccurred(error: Throwable): Unit = { | ||
cf.completeExceptionally(error) | ||
() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main part of the functionality
Stream | ||
.eval(client.getObject(getObjectRequest(bucket, fileKey), new Fs2StreamAsyncResponseTransformer)) | ||
.flatMap(_.toStreamBuffered[IO](2).flatMap(bb => Stream.chunk(Chunk.byteBuffer(bb)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where the streaming is happening
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the 2 for ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the buffer size. It means that it'll ask for 2 bytebuffers at a time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(It was taken from the example you gave, it's not a value I chose)
import fs2.Stream | ||
import software.amazon.awssdk.services.s3.model._ | ||
|
||
private[client] object S3StorageClientDisabled extends S3StorageClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not to address in this PR and not a priority but we may want to get rid of this one.
As long as you can't create a S3 storage when the feature is disabled, it might be enough ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this probably shouldn't be needed
sourceKey: Models.FileKey, | ||
destinationBucket: Models.BucketName, | ||
destinationKey: Models.FileKey, | ||
sourceBucket: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would still be nice to reintroduce something more refined than strings at some point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem with these types specifically is when you needed to use them as a string you had to call .value.value
which is annoying, and if you forget to do it in string interpolation for example you get a bad result. There might be other options which don't have these issues
@@ -67,7 +67,7 @@ object S3FileOperations { | |||
) | |||
.compile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not this go away now ?
Did you try to test with a big-ish file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the compile, test will arrive soon
5fb52fa
to
a37058e
Compare
a37058e
to
844d557
Compare
The diff here is big but there are not many actual changes. The changes are:
AsyncResponseTransformer
(and some processing of what is returned in thereadFile
method) to actually stream when fetching a filemultiPartRead
since we can stream the file normally nowBucketName
/FileKey
as they are no longer usedS3StorageClient
as the file was much too big. Privacy rules are used to ensure users cannot construct impls directly (this was actually happening before, my understanding is we prefer not to do this)