Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java V2: Add examples for uploading a stream of unkown size in S3 #5019

Merged
merged 5 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .doc_gen/metadata/s3_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2700,3 +2700,25 @@ s3_Scenario_URIParsing:
- s3.java2.scenario_uriparsing.main
services:
s3: {}
s3_Scenario_UploadStream:
title: Upload a stream of unknown size to an &S3; object using an &AWS; SDK
title_abbrev: Upload stream of unknown size
synopsis: upload a stream of unknown size to an &S3; object.
category: Scenarios
languages:
Java:
versions:
- sdk_version: 2
github: java/example_code/s3
sdkguide:
excerpts:
- description: Use the <ulink url="sdk-for-java/latest/developer-guide/crt-based-s3-client.html" type="documentation">&AWS; CRT-based S3 Client</ulink>.
snippet_tags:
- s3.java2.async_stream.import
- s3.java2.async_stream.main
- description: Use the <ulink url="sdk-for-java/latest/developer-guide/transfer-manager.html" type="documentation">&S3; Transfer Manager</ulink>.
snippet_tags:
- s3.tm.java2.upload_stream.import
- s3.tm.java2.upload_stream.main
services:
s3: {}
11 changes: 8 additions & 3 deletions javav2/example_code/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<source>11</source>
<target>11</target>
scmacdon marked this conversation as resolved.
Show resolved Hide resolved
</configuration>
<!-- The following execution section process the plugin annotations for apache logging for the custom memory appender, MemoryLog4jAppender, used in the test for ParseUri.java -->
<executions>
Expand Down Expand Up @@ -109,7 +109,7 @@
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.21.12</version>
<version>0.22.2</version>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
Expand Down Expand Up @@ -149,5 +149,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.example.s3.async;

// snippet-start:[s3.java2.async_stream.complete]
// snippet-start:[s3.java2.async_stream.import]
import com.example.s3.util.AsyncExampleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.io.ByteArrayInputStream;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
// snippet-end:[s3.java2.async_stream.import]

public class PutObjectFromStreamAsync {
private static final Logger logger = LoggerFactory.getLogger(PutObjectFromStreamAsync.class);


public static void main(String[] args) {
String bucketName = "x-" + UUID.randomUUID();
String key = UUID.randomUUID().toString();

AsyncExampleUtils.createBucket(bucketName);
try {
PutObjectFromStreamAsync example = new PutObjectFromStreamAsync();
PutObjectResponse putObjectResponse = example.putObjectFromStream(AsyncExampleUtils.client, bucketName, key);
logger.info("Object {} etag: {}", key, putObjectResponse.eTag());
logger.info("Object {} uploaded to bucket {}.", key, bucketName);
} catch (SdkException e) {
logger.error(e.getMessage(), e);
} finally {
AsyncExampleUtils.deleteObject(bucketName, key);
AsyncExampleUtils.deleteBucket(bucketName);
}
}

// snippet-start:[s3.java2.async_stream.main]
/**
* @param s33CrtAsyncClient - To upload content from a stream of unknown size, use the AWS CRT-based S3 client. For more information, see
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/crt-based-s3-client.html.
* @param bucketName - The name of the bucket.
* @param key - The name of the object.
* @return software.amazon.awssdk.services.s3.model.PutObjectResponse - Returns metadata pertaining to the put object operation.
*/
public PutObjectResponse putObjectFromStream(S3AsyncClient s33CrtAsyncClient, String bucketName, String key) {

BlockingInputStreamAsyncRequestBody body =
AsyncRequestBody.forBlockingInputStream(null); // 'null' indicates a stream will be provided later.

CompletableFuture<PutObjectResponse> responseFuture =
s33CrtAsyncClient.putObject(r -> r.bucket(bucketName).key(key), body);

// AsyncExampleUtils.randomString() returns a random string up to 100 characters.
String randomString = AsyncExampleUtils.randomString();
logger.info("random string to upload: {}: length={}", randomString, randomString.length());

// Provide the stream of data to be uploaded.
body.writeInputStream(new ByteArrayInputStream(randomString.getBytes()));

PutObjectResponse response = responseFuture.join(); // Wait for the response.
logger.info("Object {} uploaded to bucket {}.", key, bucketName);
return response;
}
}
// snippet-end:[s3.java2.async_stream.main]
// snippet-end:[s3.java2.async_stream.complete]
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.example.s3.transfermanager;

// snippet-start:[s3.tm.java2.upload_stream.complete]
// snippet-start:[s3.tm.java2.upload_stream.import]
import com.example.s3.util.AsyncExampleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedUpload;
import software.amazon.awssdk.transfer.s3.model.Upload;

import java.io.ByteArrayInputStream;
import java.util.UUID;
// snippet-end:[s3.tm.java2.upload_stream.import]

public class UploadStream {
private static final Logger logger = LoggerFactory.getLogger(UploadStream.class);

public static void main(String[] args) {
String bucketName = "x-" + UUID.randomUUID();
String key = UUID.randomUUID().toString();

AsyncExampleUtils.createBucket(bucketName);
try {
UploadStream example = new UploadStream();
CompletedUpload completedUpload = example.uploadStream(S3TransferManager.create(), bucketName, key);
logger.info("Object {} etag: {}", key, completedUpload.response().eTag());
logger.info("Object {} uploaded to bucket {}.", key, bucketName);
} catch (SdkException e) {
logger.error(e.getMessage(), e);
} finally {
AsyncExampleUtils.deleteObject(bucketName, key);
AsyncExampleUtils.deleteBucket(bucketName);
}
}

// snippet-start:[s3.tm.java2.upload_stream.main]
/**
* @param transferManager - To upload content from a stream of unknown size, use the S3TransferManager based on the AWS CRT-based S3 client.
* For more information, see https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/transfer-manager.html.
* @param bucketName - The name of the bucket.
* @param key - The name of the object.
* @return - software.amazon.awssdk.transfer.s3.model.CompletedUpload - The result of the completed upload.
*/
public CompletedUpload uploadStream(S3TransferManager transferManager, String bucketName, String key) {

BlockingInputStreamAsyncRequestBody body =
AsyncRequestBody.forBlockingInputStream(null); // 'null' indicates a stream will be provided later.

Upload upload = transferManager.upload(builder -> builder
.requestBody(body)
.putObjectRequest(req -> req.bucket(bucketName).key(key))
.build());

// AsyncExampleUtils.randomString() returns a random string up to 100 characters.
String randomString = AsyncExampleUtils.randomString();
logger.info("random string to upload: {}: length={}", randomString, randomString.length());

// Provide the stream of data to be uploaded.
body.writeInputStream(new ByteArrayInputStream(randomString.getBytes()));

return upload.completionFuture().join();
}
}
// snippet-end:[s3.tm.java2.upload_stream.main]
// snippet-end:[s3.tm.java2.upload_stream.complete]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.example.s3.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;

public class AsyncExampleUtils {
public static S3AsyncClient client = S3AsyncClient.crtCreate();
private static final Logger logger = LoggerFactory.getLogger(AsyncExampleUtils.class);

public static void createBucket(String bucketName) {
client.createBucket(b -> b.bucket(bucketName)).join();
client.waiter().waitUntilBucketExists(b -> b.bucket(bucketName)).join();
logger.info("Bucket {} created.", bucketName);
}

public static void deleteObject(String bucketName, String key) {
client.deleteObject(b -> b.bucket(bucketName).key(key)).join();
logger.info("Object {} deleted from bucket {}.", key, bucketName);
}

public static void deleteBucket(String bucketName) {
client.deleteBucket(b -> b.bucket(bucketName)).join();
client.waiter().waitUntilBucketNotExists(b -> b.bucket(bucketName)).join();
logger.info("Bucket {} deleted.", bucketName);
}

public static String randomString() {
int length = (int)(Math.random()*100);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
sb.append((char) (Math.random() * 26 + 97));
}
return sb.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static MemoryLog4jAppender createAppender(
public void append(LogEvent event) {
MutableLogEvent eventWithParameters = (MutableLogEvent) event;
if (eventWithParameters.getParameterCount() == 2) {
eventMap.put((String) eventWithParameters.getParameters()[0], (String) eventWithParameters.getParameters()[1]);
eventMap.put(eventWithParameters.getParameters()[0].toString(), eventWithParameters.getParameters()[1].toString());
} else {
eventMap.put (eventWithParameters.getFormattedMessage(), null);
}
Expand Down
47 changes: 47 additions & 0 deletions javav2/example_code/s3/src/test/java/TransferManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@
import com.example.s3.transfermanager.S3ClientFactory;
import com.example.s3.transfermanager.UploadADirectory;
import com.example.s3.transfermanager.UploadFile;
import com.example.s3.transfermanager.UploadStream;
import com.example.s3.util.AsyncExampleUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedUpload;

import java.util.UUID;

@TestInstance(TestInstance.Lifecycle.PER_METHOD)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
Expand All @@ -23,6 +33,7 @@ class TransferManagerTest {

@Test
@Order(1)
@Tag("IntegrationTest")
public void uploadSingleFileWorks(){
UploadFile upload = new UploadFile();
String etag = upload.uploadFile(S3ClientFactory.transferManager, upload.bucketName,
Expand All @@ -33,6 +44,7 @@ public void uploadSingleFileWorks(){

@Test
@Order(2)
@Tag("IntegrationTest")
public void downloadSingleFileWorks(){
DownloadFile download = new DownloadFile();
Long fileLength = download.downloadFile(S3ClientFactory.transferManager, download.bucketName, download.key, download.downloadedFileWithPath);
Expand All @@ -42,6 +54,7 @@ public void downloadSingleFileWorks(){

@Test
@Order(3)
@Tag("IntegrationTest")
public void copyObjectWorks(){
ObjectCopy copy = new ObjectCopy();
String etag = copy.copyObject(S3ClientFactory.transferManager, copy.bucketName, copy.key, copy.destinationBucket, copy.destinationKey);
Expand All @@ -51,6 +64,7 @@ public void copyObjectWorks(){

@Test
@Order(4)
@Tag("IntegrationTest")
public void directoryUploadWorks(){
UploadADirectory upload = new UploadADirectory();
Integer numFailedUploads = upload.uploadDirectory(S3ClientFactory.transferManager, upload.sourceDirectory, upload.bucketName);
Expand All @@ -61,10 +75,43 @@ public void directoryUploadWorks(){

@Test
@Order(5)
@Tag("IntegrationTest")
public void directoryDownloadWorks(){
DownloadToDirectory download = new DownloadToDirectory();
Integer numFilesFailedToDownload = download.downloadObjectsToDirectory(S3ClientFactory.transferManager, download.destinationPath, download.bucketName);
Assertions.assertNotNull(numFilesFailedToDownload, "Bucket download failed to complete.");
download.cleanUp();
}

@Test
@Order(6)
@Tag("IntegrationTest")
public void uploadStreamWorks(){
String bucketName = "x-" + UUID.randomUUID();
String key = UUID.randomUUID().toString();

AsyncExampleUtils.createBucket(bucketName);
try {
UploadStream example = new UploadStream();
CompletedUpload completedUpload = example.uploadStream(S3TransferManager.create(), bucketName, key);
logger.info("Object {} etag: {}", key, completedUpload.response().eTag());
logger.info("Object {} uploaded to bucket {}.", key, bucketName);
Assertions.assertTrue(completedUpload.response().sdkHttpResponse().isSuccessful());
} catch (SdkException e) {
logger.error(e.getMessage(), e);
} finally {
AsyncExampleUtils.deleteObject(bucketName, key);
AsyncExampleUtils.deleteBucket(bucketName);
}
}

@BeforeAll
public static void beforeAll(){
logger.info("S3TransferManager tests starting ...");
}

@AfterAll
public static void afterAll(){
logger.info("... S3TransferManager tests finished");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.example.s3.async;

import com.example.s3.util.AsyncExampleUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.util.UUID;

class AsyncTests {
private static final Logger logger = LoggerFactory.getLogger(AsyncTests.class);
private String bucketName ;
private String key;

@BeforeEach
void setUp() {
bucketName = "x-" + UUID.randomUUID();
key = UUID.randomUUID().toString();
AsyncExampleUtils.createBucket(bucketName);
}

@AfterEach
void tearDown() {
AsyncExampleUtils.deleteObject(bucketName, key);
AsyncExampleUtils.deleteBucket(bucketName);
}

@Test
@Tag("IntegrationTest")
void putObjectFromStream() {
PutObjectFromStreamAsync example = new PutObjectFromStreamAsync();
PutObjectResponse putObjectResponse = example.putObjectFromStream(AsyncExampleUtils.client, bucketName, key);
Assertions.assertNotNull(putObjectResponse.eTag());
}
}