Skip to content

Commit

Permalink
[Issue #120]: support configurable S3 clients. (#122)
Browse files Browse the repository at this point in the history
1. support swithing between async and sync S3 clients in PhysicalS3Reader.
2. support S3 client configuration from pixels.properties file.
3. Using sync client for better performance in S3InputSteam and S3OutputStream.
  • Loading branch information
bianhq authored Oct 8, 2021
1 parent 7bf47fe commit dde1be5
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 63 deletions.
6 changes: 6 additions & 0 deletions pixels-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<optional>true</optional>
</dependency>

<!-- protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public class StorageFactory
{
private Logger logger = LogManager.getLogger(StorageFactory.class);
private static Logger logger = LogManager.getLogger(StorageFactory.class);
private Map<Storage.Scheme, Storage> storageImpls = new HashMap<>();

private StorageFactory() { }
Expand All @@ -47,6 +47,17 @@ public static StorageFactory Instance()
if (instance == null)
{
instance = new StorageFactory();
Runtime.getRuntime().addShutdownHook(new Thread(()->
{
try
{
instance.closeAll();
} catch (IOException e)
{
logger.error("Failed to close all storage instances.", e);
e.printStackTrace();
}
}));
}
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -41,14 +46,24 @@
public class PhysicalS3Reader implements PhysicalReader
{
private static Logger logger = LogManager.getLogger(PhysicalS3Reader.class);
private static boolean enableAsync = false;
private static boolean useAsyncClient = false;
private static final ExecutorService clientService;

static
{
clientService = Executors.newFixedThreadPool(32);
Runtime.getRuntime().addShutdownHook( new Thread(clientService::shutdownNow));
}

private S3 s3;
private S3.Path path;
private String pathStr;
private long id;
private AtomicLong position;
private long length;
private S3AsyncClient client;
private S3Client client;
private S3AsyncClient asyncClient;

public PhysicalS3Reader(Storage storage, String path) throws IOException
{
Expand All @@ -71,6 +86,13 @@ public PhysicalS3Reader(Storage storage, String path) throws IOException
this.length = this.s3.getStatus(path).getLength();
this.position = new AtomicLong(0);
this.client = this.s3.getClient();
this.asyncClient = this.s3.getAsyncClient();
enableAsync = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("s3.enable.async"));
useAsyncClient = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("s3.use.async.client"));
if (!useAsyncClient)
{
this.asyncClient.close();
}
}

private String toRange(long start, int length)
Expand Down Expand Up @@ -108,12 +130,12 @@ public ByteBuffer readFully(int len) throws IOException
}
GetObjectRequest request = GetObjectRequest.builder().bucket(path.bucket)
.key(path.key).range(toRange(position.get(), len)).build();
CompletableFuture<ResponseBytes<GetObjectResponse>> future =
client.getObject(request, AsyncResponseTransformer.toBytes());
ResponseBytes<GetObjectResponse> response =
client.getObject(request, ResponseTransformer.toBytes());
try
{
this.position.addAndGet(len);
return ByteBuffer.wrap(future.get().asByteArray());
return ByteBuffer.wrap(response.asByteArray());
} catch (Exception e)
{
throw new IOException("Failed to read object.", e);
Expand All @@ -140,8 +162,7 @@ public void readFully(byte[] buffer, int off, int len) throws IOException
@Override
public boolean supportsAsync()
{
// TODO: async read does not work properly.
return true;
return enableAsync;
}

@Override
Expand All @@ -154,8 +175,21 @@ public CompletableFuture<ByteBuffer> readAsync(long offset, int len) throws IOEx
}
GetObjectRequest request = GetObjectRequest.builder().bucket(path.bucket)
.key(path.key).range(toRange(offset, len)).build();
CompletableFuture<ResponseBytes<GetObjectResponse>> future =
client.getObject(request, AsyncResponseTransformer.toBytes());
CompletableFuture<ResponseBytes<GetObjectResponse>> future;
if (useAsyncClient)
{
future = asyncClient.getObject(request, AsyncResponseTransformer.toBytes());
}
else
{
future = new CompletableFuture<>();
clientService.execute(() -> {
ResponseBytes<GetObjectResponse> response =
client.getObject(request, ResponseTransformer.toBytes());
future.complete(response);
});
}

try
{
CompletableFuture<ByteBuffer> futureBuffer = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.pixelsdb.pixels.common.physical.Storage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -43,7 +43,7 @@ public class PhysicalS3Writer implements PhysicalWriter
private S3.Path path;
private String pathStr;
private long position;
private S3AsyncClient client;
private S3Client client;
private OutputStream out;

public PhysicalS3Writer(Storage storage, String path) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@
import io.pixelsdb.pixels.common.physical.Location;
import io.pixelsdb.pixels.common.physical.Status;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.EtcdUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -60,20 +64,52 @@ public class S3 implements Storage
{
private static Logger logger = LogManager.getLogger(S3.class);

private static int connectionTimeoutSec = 60;
private static int connectionAcquisitionTimeoutSec = 60;
private static int eventLoopGroupThreads = 20;
private static int maxConcurrentRequests = 200;
private static int maxPendingRequests = 50_000;
private static S3Client s3;
private static S3AsyncClient s3Async;

static
{
InitId(S3_ID_KEY);

S3AsyncClientBuilder builder = S3AsyncClient.builder();
//.httpClientBuilder(NettyNioAsyncHttpClient.builder()
// .eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(20).build())
// .maxConcurrency(100).maxPendingConnectionAcquires(10_000));
s3Async = builder.build();

s3 = S3Client.builder().build();
connectionTimeoutSec = Integer.parseInt(
ConfigFactory.Instance().getProperty("s3.connection.timeout.sec"));
connectionAcquisitionTimeoutSec = Integer.parseInt(
ConfigFactory.Instance().getProperty("s3.connection.acquisition.timeout.sec"));
eventLoopGroupThreads = Integer.parseInt(
ConfigFactory.Instance().getProperty("s3.event.loop.group.threads"));
maxConcurrentRequests = Integer.parseInt(
ConfigFactory.Instance().getProperty("s3.max.concurrent.requests"));
maxPendingRequests = Integer.parseInt(
ConfigFactory.Instance().getProperty("s3.max.pending.requests"));

ConfigFactory.Instance().registerUpdateCallback("s3.connection.timeout.sec", value ->
connectionTimeoutSec = Integer.parseInt(value));
ConfigFactory.Instance().registerUpdateCallback("s3.connection.acquisition.timeout.sec", value ->
connectionAcquisitionTimeoutSec = Integer.parseInt(value));
ConfigFactory.Instance().registerUpdateCallback("s3.event.loop.group.threads", value ->
eventLoopGroupThreads = Integer.parseInt(value));
ConfigFactory.Instance().registerUpdateCallback("s3.max.concurrent.requests", value ->
maxConcurrentRequests = Integer.parseInt(value));
ConfigFactory.Instance().registerUpdateCallback("s3.max.pending.requests", value ->
maxPendingRequests = Integer.parseInt(value));

s3Async = S3AsyncClient.builder()
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
.connectionTimeout(Duration.ofSeconds(connectionTimeoutSec))
.connectionAcquisitionTimeout(Duration.ofSeconds(connectionAcquisitionTimeoutSec))
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(eventLoopGroupThreads).build())
.maxConcurrency(maxConcurrentRequests).maxPendingConnectionAcquires(maxPendingRequests)).build();

s3 = S3Client.builder().httpClientBuilder(ApacheHttpClient.builder()
.connectionTimeout(Duration.ofSeconds(connectionTimeoutSec))
.socketTimeout(Duration.ofSeconds(connectionTimeoutSec))
.connectionAcquisitionTimeout(Duration.ofSeconds(connectionAcquisitionTimeoutSec))
.maxConnections(maxConcurrentRequests)).build();
}

private String[] allHosts;
Expand Down Expand Up @@ -290,7 +326,7 @@ public DataInputStream open(String path) throws IOException
{
throw new IOException("Path '" + path + "' does not exist.");
}
return new DataInputStream(new S3InputStream(s3Async, p.bucket, p.key));
return new DataInputStream(new S3InputStream(s3, p.bucket, p.key));
}

/**
Expand All @@ -317,7 +353,7 @@ public DataOutputStream create(String path, boolean overwrite, int bufferSize, s
}
long id = GenerateId(S3_ID_KEY);
EtcdUtil.Instance().putKeyValue(getPathKey(path), Long.toString(id));
return new DataOutputStream(new S3OutputStream(s3Async, p.bucket, p.key));
return new DataOutputStream(new S3OutputStream(s3, p.bucket, p.key));
}

@Override
Expand Down Expand Up @@ -419,7 +455,12 @@ public boolean isDirectory(String path)
return new Path(path).isBucket;
}

public S3AsyncClient getClient()
public S3Client getClient()
{
return s3;
}

public S3AsyncClient getAsyncClient()
{
return s3Async;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@
package io.pixelsdb.pixels.common.physical.impl;

import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* Created at: 9/29/21
Expand Down Expand Up @@ -76,7 +74,7 @@ public class S3InputStream extends InputStream
/**
* Amazon S3 client.
*/
private final S3AsyncClient s3Client;
private final S3Client s3Client;

/**
* indicates whether the stream is still open / valid
Expand All @@ -90,7 +88,7 @@ public class S3InputStream extends InputStream
* @param bucket name of the bucket
* @param key path (key) within the bucket
*/
public S3InputStream(S3AsyncClient s3Client, String bucket, String key) throws IOException
public S3InputStream(S3Client s3Client, String bucket, String key) throws IOException
{
this.s3Client = s3Client;
this.bucket = bucket;
Expand All @@ -102,7 +100,7 @@ public S3InputStream(S3AsyncClient s3Client, String bucket, String key) throws I
HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
try
{
HeadObjectResponse response = s3Client.headObject(request).get();
HeadObjectResponse response = s3Client.headObject(request);
this.length = response.contentLength();
} catch (Exception e)
{
Expand Down Expand Up @@ -192,15 +190,14 @@ protected int populateBuffer() throws IOException
}
GetObjectRequest request = GetObjectRequest.builder().bucket(this.bucket)
.key(this.key).range(toRange(this.position, bytesToRead)).build();
CompletableFuture<ResponseBytes<GetObjectResponse>> future =
this.s3Client.getObject(request, AsyncResponseTransformer.toBytes());
ResponseBytes<GetObjectResponse> future = this.s3Client.getObject(request, ResponseTransformer.toBytes());
try
{
this.buffer = future.get().asByteArray();
this.buffer = future.asByteArray();
this.bufferPosition = 0;
this.position += this.buffer.length;
return this.buffer.length;
} catch (InterruptedException | ExecutionException e)
} catch (Exception e)
{
this.buffer = null;
this.bufferPosition = 0;
Expand Down
Loading

0 comments on commit dde1be5

Please sign in to comment.