Skip to content

Commit

Permalink
feat(filesystems): add compression.method for zip/gzip/bzip support
Browse files Browse the repository at this point in the history
allows reading compressed data stored on s3
  • Loading branch information
dev-jimbo authored and fhussonnois committed Jan 24, 2025
1 parent cca7d2e commit 6cc9366
Show file tree
Hide file tree
Showing 8 changed files with 500 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*;

import com.amazonaws.services.s3.AmazonS3;
import io.findify.s3mock.S3Mock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.junit.After;
import org.junit.Before;

public class BaseBzipAmazonS3Test {

public static final String S3_TEST_BUCKET = "testbucket";
protected S3Mock s3Mock;
protected AmazonS3 client;
protected String endpointConfiguration;
protected Map<String, String> unmodifiableCommonsProperties;

@Before
public void setUp() throws Exception {
final Random generator = new Random();
final int s3Port = generator.nextInt(10000) + 10000;
s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build();
s3Mock.start();

endpointConfiguration = "http://localhost:" + s3Port;
unmodifiableCommonsProperties = new HashMap<>();
unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id");
unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key");
unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET);
unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2");
unmodifiableCommonsProperties.put("compression.method", "bzip");
unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties);

client = AmazonS3ClientUtils.createS3Client(
new AmazonS3ClientConfig(unmodifiableCommonsProperties),
endpointConfiguration
);
}

@After
public void tearDown() throws Exception {
client.shutdown();
s3Mock.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*;

import com.amazonaws.services.s3.AmazonS3;
import io.findify.s3mock.S3Mock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.junit.After;
import org.junit.Before;

public class BaseGzipAmazonS3Test {

public static final String S3_TEST_BUCKET = "testbucket";
protected S3Mock s3Mock;
protected AmazonS3 client;
protected String endpointConfiguration;
protected Map<String, String> unmodifiableCommonsProperties;

@Before
public void setUp() throws Exception {
final Random generator = new Random();
final int s3Port = generator.nextInt(10000) + 10000;
s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build();
s3Mock.start();

endpointConfiguration = "http://localhost:" + s3Port;
unmodifiableCommonsProperties = new HashMap<>();
unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id");
unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key");
unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET);
unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2");
unmodifiableCommonsProperties.put("compression.method", "gzip");
unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties);

client = AmazonS3ClientUtils.createS3Client(
new AmazonS3ClientConfig(unmodifiableCommonsProperties),
endpointConfiguration
);
}

@After
public void tearDown() throws Exception {
client.shutdown();
s3Mock.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*;

import com.amazonaws.services.s3.AmazonS3;
import io.findify.s3mock.S3Mock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.junit.After;
import org.junit.Before;

public class BaseZipAmazonS3Test {

public static final String S3_TEST_BUCKET = "testbucket";
protected S3Mock s3Mock;
protected AmazonS3 client;
protected String endpointConfiguration;
protected Map<String, String> unmodifiableCommonsProperties;

@Before
public void setUp() throws Exception {
final Random generator = new Random();
final int s3Port = generator.nextInt(10000) + 10000;
s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build();
s3Mock.start();

endpointConfiguration = "http://localhost:" + s3Port;
unmodifiableCommonsProperties = new HashMap<>();
unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id");
unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key");
unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET);
unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2");
unmodifiableCommonsProperties.put("compression.method", "zip");
unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties);

client = AmazonS3ClientUtils.createS3Client(
new AmazonS3ClientConfig(unmodifiableCommonsProperties),
endpointConfiguration
);
}

@After
public void tearDown() throws Exception {
client.shutdown();
s3Mock.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
import io.streamthoughts.kafka.connect.filepulse.fs.BaseBzipAmazonS3Test;
import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class AmazonS3RowBzipFileInputReaderTest extends BaseBzipAmazonS3Test{

private static final String LF = "\n";

private static final int NLINES = 10;

@Rule
public TemporaryFolder testFolder = new TemporaryFolder();

private File objectFile;

private AmazonS3RowFileInputReader reader;

@Before
public void setUp() throws Exception {
super.setUp();
objectFile = testFolder.newFile();
System.out.println(objectFile.toPath());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new BZip2CompressorOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8));
generateLines(writer);

reader = new AmazonS3RowFileInputReader();
reader.setStorage(new AmazonS3Storage(client));
reader.configure(unmodifiableCommonsProperties);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
reader.close();
}

@Test
public void should_read_all_zip_lines() {
client.createBucket(S3_TEST_BUCKET);
client.putObject(S3_TEST_BUCKET, "my-key", objectFile);

final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
.withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI())
.build();

final FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(meta.uri());
List<FileRecord<TypedStruct>> results = new ArrayList<>();
while (iterator.hasNext()) {
final RecordsIterable<FileRecord<TypedStruct>> next = iterator.next();
results.addAll(next.collect());
}
Assert.assertEquals(10, results.size());
}

private void generateLines(final BufferedWriter writer) throws IOException {

for (int i = 0; i < NLINES; i++) {
String line = "00000000-" + i;
writer.write(line);
if (i + 1 < NLINES) {
writer.write(LF);
}
}
writer.flush();
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
import io.streamthoughts.kafka.connect.filepulse.fs.BaseGzipAmazonS3Test;
import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class AmazonS3RowGzipFileInputReaderTest extends BaseGzipAmazonS3Test {

private static final String LF = "\n";

private static final int NLINES = 10;

@Rule
public TemporaryFolder testFolder = new TemporaryFolder();

private File objectFile;

private AmazonS3RowFileInputReader reader;

@Before
public void setUp() throws Exception {
super.setUp();
objectFile = testFolder.newFile();
System.out.println(objectFile.toPath());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8));
generateLines(writer);

reader = new AmazonS3RowFileInputReader();
reader.setStorage(new AmazonS3Storage(client));
reader.configure(unmodifiableCommonsProperties);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
reader.close();
}

@Test
public void should_read_all_gzip_lines() {
client.createBucket(S3_TEST_BUCKET);
client.putObject(S3_TEST_BUCKET, "my-key", objectFile);

final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
.withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI())
.build();

final FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(meta.uri());
List<FileRecord<TypedStruct>> results = new ArrayList<>();
while (iterator.hasNext()) {
final RecordsIterable<FileRecord<TypedStruct>> next = iterator.next();
results.addAll(next.collect());
}
Assert.assertEquals(10, results.size());
}

private void generateLines(final BufferedWriter writer) throws IOException {

for (int i = 0; i < NLINES; i++) {
String line = "00000000-" + i;
writer.write(line);
if (i + 1 < NLINES) {
writer.write(LF);
}
}
writer.flush();
writer.close();
}
}
Loading

0 comments on commit 6cc9366

Please sign in to comment.