Skip to content
This repository has been archived by the owner on May 20, 2021. It is now read-only.

Commit

Permalink
Merge pull request #160 from Nasdaq/s3-cse-encryption-and-compression
Browse files Browse the repository at this point in the history
S3 cse encryption and compression
  • Loading branch information
andykram committed Feb 4, 2016
2 parents a4e662a + f1b4495 commit 4d313d0
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 14 deletions.
12 changes: 12 additions & 0 deletions src/main/java/com/airbnb/airpal/AirpalConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public class AirpalConfiguration extends Configuration
@JsonProperty
private String s3Bucket;

@Getter
@Setter
@JsonProperty
private String s3EncryptionMaterialsProvider;

@Getter
@Setter
@JsonProperty
Expand Down Expand Up @@ -115,4 +120,11 @@ public class AirpalConfiguration extends Configuration
@JsonProperty
@NotNull
private boolean useS3 = false;

@Getter
@Setter
@Valid
@JsonProperty
@NotNull
private boolean compressedOutput = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.List;
import java.util.UUID;
import java.util.zip.GZIPOutputStream;

@Slf4j
public class CsvOutputBuilder implements JobOutputBuilder
Expand All @@ -34,13 +36,20 @@ public class CsvOutputBuilder implements JobOutputBuilder
@JsonIgnore
private final UUID jobUUID;

public CsvOutputBuilder(boolean includeHeader, UUID jobUUID, long maxFileSizeBytes) throws IOException {
public CsvOutputBuilder(boolean includeHeader, UUID jobUUID, long maxFileSizeBytes, boolean compressedOutput) throws IOException {
this.includeHeader = includeHeader;
this.jobUUID = jobUUID;
this.outputFile = File.createTempFile(jobUUID.toString(), FILE_SUFFIX);
this.maxFileSizeBytes = maxFileSizeBytes;
this.countingOutputStream = new CountingOutputStream(new FileOutputStream(this.outputFile));
this.csvWriter = new CSVWriter(new OutputStreamWriter(this.countingOutputStream));
OutputStreamWriter writer;
if (compressedOutput) {
writer = new OutputStreamWriter(new GZIPOutputStream(this.countingOutputStream));
}
else {
writer = new OutputStreamWriter(this.countingOutputStream);
}
this.csvWriter = new CSVWriter(writer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
public class OutputBuilderFactory
{
private final long maxFileSizeBytes;
private final boolean isCompressedOutput;

public JobOutputBuilder forJob(Job job)
throws IOException, InvalidQueryException
{
PersistentJobOutput output = job.getOutput();
switch (output.getType()) {
case "csv":
return new CsvOutputBuilder(true, job.getUuid(), maxFileSizeBytes);
return new CsvOutputBuilder(true, job.getUuid(), maxFileSizeBytes, isCompressedOutput);
case "hive":
HiveTablePersistentOutput hiveOutput = (HiveTablePersistentOutput) output;
URI location = output.getLocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ public class CSVPersistorFactory
private AmazonS3 s3Client;
private String s3Bucket;
private ExpiringFileStore expiringFileStore;
private boolean compressedOutput;

public Persistor getPersistor(Job job, PersistentJobOutput jobOutput)
{
// TODO: Support variable CSV persistor.
if (useS3Persistor) {
return new S3FilePersistor(s3Client, s3Bucket, 0L);
return new S3FilePersistor(s3Client, s3Bucket, 0L, compressedOutput);
} else {
return new FlatFilePersistor(expiringFileStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class S3FilePersistor
private final AmazonS3 s3Client;
private final String outputBucket;
private final long maxSizeForTextView;
private final boolean compressedOutput;

@Override
public boolean canPersist(QueryExecutionAuthorizer authorizer)
Expand All @@ -46,6 +47,9 @@ public URI persist(JobOutputBuilder outputBuilder, Job job)
val objectMetaData = new ObjectMetadata();
objectMetaData.setContentLength(file.length());
objectMetaData.setContentType(MediaType.CSV_UTF_8.toString());
if (compressedOutput) {
objectMetaData.setContentEncoding("gzip");
}

val putRequest = new PutObjectRequest(
outputBucket,
Expand Down
48 changes: 42 additions & 6 deletions src/main/java/com/airbnb/airpal/modules/AirpalModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.joda.JodaMapper;
import com.google.common.base.Strings;
Expand All @@ -64,6 +66,7 @@
import io.dropwizard.setup.Environment;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.web.env.EnvironmentLoaderListener;
import org.jetbrains.annotations.Nullable;
import org.skife.jdbi.v2.DBI;

import javax.inject.Named;
Expand Down Expand Up @@ -284,14 +287,47 @@ public AWSCredentials provideAWSCredentials()

@Singleton
@Provides
public AmazonS3 provideAmazonS3Client(AWSCredentials awsCredentials)
public AmazonS3 provideAmazonS3Client(AWSCredentials awsCredentials, EncryptionMaterialsProvider encryptionMaterialsProvider)
{
if (awsCredentials == null) {
InstanceProfileCredentialsProvider iamCredentials = new InstanceProfileCredentialsProvider();
return new AmazonS3Client(iamCredentials);
if (encryptionMaterialsProvider == null) {
return new AmazonS3Client(new InstanceProfileCredentialsProvider());
}
else {
return new AmazonS3EncryptionClient(new InstanceProfileCredentialsProvider(), encryptionMaterialsProvider);
}
}

return new AmazonS3Client(awsCredentials);
if (encryptionMaterialsProvider == null) {
return new AmazonS3Client(awsCredentials);
}
else {
return new AmazonS3EncryptionClient(awsCredentials, encryptionMaterialsProvider);
}
}

@Nullable
@Singleton
@Provides
private EncryptionMaterialsProvider provideEncryptionMaterialsProvider() {
String empClassName = config.getS3EncryptionMaterialsProvider();
if (empClassName != null) {
try {
Class<?> empClass = Class.forName(empClassName);
Object instance = empClass.newInstance();
if (instance instanceof EncryptionMaterialsProvider) {
return (EncryptionMaterialsProvider)instance;
}
else {
throw new IllegalArgumentException("Class " + empClassName + " must implement EncryptionMaterialsProvider");
}
}
catch (Exception x) {
throw new RuntimeException("Unable to initialize EncryptionMaterialsProvider class " + empClassName + ": " + x, x);
}
}

return null;
}

@Singleton
Expand Down Expand Up @@ -334,7 +370,7 @@ public ExpiringFileStore provideExpiringFileStore()
@Singleton
public CSVPersistorFactory provideCSVPersistorFactory(ExpiringFileStore fileStore, AmazonS3 s3Client, @Named("s3Bucket") String s3Bucket)
{
return new CSVPersistorFactory(config.isUseS3(), s3Client, s3Bucket, fileStore);
return new CSVPersistorFactory(config.isUseS3(), s3Client, s3Bucket, fileStore, config.isCompressedOutput());
}

@Provides
Expand All @@ -349,6 +385,6 @@ public PersistorFactory providePersistorFactory(CSVPersistorFactory csvPersistor
public OutputBuilderFactory provideOutputBuilderFactory()
{
long maxFileSizeInBytes = Math.round(Math.floor(config.getMaxOutputSize().getValue(DataSize.Unit.BYTE)));
return new OutputBuilderFactory(maxFileSizeInBytes);
return new OutputBuilderFactory(maxFileSizeInBytes, config.isCompressedOutput());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.airbnb.airpal.resources;

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.opencsv.CSVReader;
import com.airbnb.airpal.core.store.files.ExpiringFileStore;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -23,13 +24,15 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.net.URI;
import java.util.zip.GZIPInputStream;

@Slf4j
@Path("/api/preview")
Expand Down Expand Up @@ -82,9 +85,19 @@ private Response getS3Preview(URI fileURI, int numLines) {
outputKey
).withRange(0, 100 * 1024);
val object = s3Client.getObject(request);
try (val s3Reader = new CSVReader(new BufferedReader(new InputStreamReader(object.getObjectContent())))) {
return getPreviewFromCSV(s3Reader, numLines);
} catch (IOException e) {
ObjectMetadata objectMetadata = object.getObjectMetadata();
boolean gzip = "gzip".equalsIgnoreCase(objectMetadata.getContentEncoding());
try (InputStream input = object.getObjectContent()) {
InputStreamReader reader;
if (gzip) {
reader = new InputStreamReader(new GZIPInputStream(input));
}
else {
reader = new InputStreamReader(input);
}
return getPreviewFromCSV(new CSVReader(reader), numLines);
}
catch (IOException e) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import lombok.val;
Expand Down Expand Up @@ -52,7 +53,12 @@ public Response getFile(@PathParam("filename") String filename)
if (object == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(new StreamingOutput() {
ObjectMetadata objectMetadata = object.getObjectMetadata();
Response.ResponseBuilder builder = Response.ok().type(objectMetadata.getContentType());
if (objectMetadata.getContentEncoding() != null) {
builder = builder.encoding(objectMetadata.getContentEncoding()); // gzip
}
return builder.entity(new StreamingOutput() {
@Override
public void write(OutputStream output)
throws IOException, WebApplicationException
Expand Down

0 comments on commit 4d313d0

Please sign in to comment.