Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #114]:support S3 storage and asynchronous I/O scheduling. #118

Merged
merged 19 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pixels-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@
<optional>true</optional>
</dependency>

<!-- aws sdk -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
<optional>true</optional>
</dependency>

<!-- protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import static io.pixelsdb.pixels.common.utils.Constants.LOCK_PATH_PREFIX;
import static io.pixelsdb.pixels.common.utils.Constants.AI_LOCK_PATH_PREFIX;

/**
* Created at: 8/29/21
Expand All @@ -45,7 +45,7 @@ public static void InitId(String idKey)
{
EtcdUtil etcd = EtcdUtil.Instance();
EtcdReadWriteLock readWriteLock = new EtcdReadWriteLock(etcd.getClient(),
LOCK_PATH_PREFIX + idKey);
AI_LOCK_PATH_PREFIX + idKey);
EtcdMutex writeLock = readWriteLock.writeLock();
try
{
Expand Down Expand Up @@ -77,7 +77,7 @@ public static long GenerateId(String idKey)
long id = 0;
EtcdUtil etcd = EtcdUtil.Instance();
EtcdReadWriteLock readWriteLock = new EtcdReadWriteLock(etcd.getClient(),
LOCK_PATH_PREFIX + idKey);
AI_LOCK_PATH_PREFIX + idKey);
EtcdMutex writeLock = readWriteLock.writeLock();
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@

/**
* @author: tao
* @author hank
* @date: Create in 2018-10-27 14:29
**/
public class EtcdReadWriteLock
{
private final EtcdMutex readMutex;
private final EtcdMutex writeMutex;
private EtcdMutex readMutex;
private EtcdMutex writeMutex;

private final Client client;
private final String basePath;
private final byte[] lockData;

private static final String READ_LOCK_NAME = "_READ_";
private static final String WRITE_LOCK_NAME = "_WRIT_";
Expand All @@ -49,8 +54,11 @@ public EtcdReadWriteLock(Client client, String basePath)
*/
public EtcdReadWriteLock(Client client, String basePath, byte[] lockData)
{
this.writeMutex = new EtcdReadWriteLock.InternalInterProcessMutex(client, basePath, WRITE_LOCK_NAME, lockData);
this.readMutex = new EtcdReadWriteLock.InternalInterProcessMutex(client, basePath, READ_LOCK_NAME, lockData);
this.client = client;
this.basePath = basePath;
this.lockData = lockData;
this.readMutex = null;
this.writeMutex = null;
}

/**
Expand All @@ -60,6 +68,10 @@ public EtcdReadWriteLock(Client client, String basePath, byte[] lockData)
*/
public EtcdMutex readLock()
{
if (this.readMutex == null)
{
this.readMutex = new EtcdReadWriteLock.InternalInterProcessMutex(client, basePath, READ_LOCK_NAME, lockData);
}
return this.readMutex;
}

Expand All @@ -70,6 +82,10 @@ public EtcdMutex readLock()
*/
public EtcdMutex writeLock()
{
if (this.writeMutex == null)
{
this.writeMutex = new EtcdReadWriteLock.InternalInterProcessMutex(client, basePath, WRITE_LOCK_NAME, lockData);
}
return this.writeMutex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class LockInternals
private Long leaseId = 0L;
private static AtomicInteger count = new AtomicInteger(0);
private volatile Map<String, Long> pathToVersion = new HashMap<>();
private ScheduledExecutorService keepAliveService;

public LockInternals(Client client, String path, String lockName)
{
Expand All @@ -72,8 +73,9 @@ public LockInternals(Client client, String path, String lockName)
logger.error("[create-lease-error]: " + e1);
return;
}
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), 1, 12, TimeUnit.SECONDS);
keepAliveService = Executors.newSingleThreadScheduledExecutor();
keepAliveService.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), 1, 12, TimeUnit.SECONDS);
keepAliveService.shutdown();
}

LockInternals verbose(boolean verbose)
Expand Down Expand Up @@ -348,6 +350,7 @@ private void deleteOurPath(String ourPath) throws Exception
public void releaseLock(String lockPath) throws Exception
{
deleteOurPath(lockPath);
this.keepAliveService.shutdownNow();
}

public static class KeepAliveTask implements Runnable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* @author guodong
* @author hank
*/
public interface PhysicalReader
extends Closeable
Expand All @@ -32,14 +35,26 @@ public interface PhysicalReader

void seek(long desired) throws IOException;

int read(byte[] buffer) throws IOException;

int read(byte[] buffer, int offset, int length) throws IOException;
ByteBuffer readFully(int length) throws IOException;

void readFully(byte[] buffer) throws IOException;

void readFully(byte[] buffer, int offset, int length) throws IOException;

/**
* @return true if readAsync is supported.
*/
boolean supportsAsync();

/**
* readAsync does not affect the position of this reader, and is not affected by seek().
* @param offset
* @param length
* @return
* @throws IOException
*/
CompletableFuture<ByteBuffer> readAsync(long offset, int length) throws IOException;

long readLong() throws IOException;

int readInt() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.pixelsdb.pixels.common.physical.impl.PhysicalHDFSReader;
import io.pixelsdb.pixels.common.physical.impl.PhysicalLocalReader;
import io.pixelsdb.pixels.common.physical.impl.PhysicalS3Reader;

import java.io.IOException;

Expand All @@ -40,23 +41,21 @@ public static PhysicalReader newPhysicalReader(Storage storage, String path) thr
{
checkArgument(storage != null, "storage should not be null");
checkArgument(path != null, "path should not be null");
PhysicalReader reader = null;
try
{
switch (storage.getScheme())
{
case hdfs:
reader = new PhysicalHDFSReader(storage, path);
break;
case file:
reader = new PhysicalLocalReader(storage, path);
break;
case s3:
throw new IOException("S3 storage is not supported");
}
} catch (IOException e)

PhysicalReader reader;
switch (storage.getScheme())
{
throw e;
case hdfs:
reader = new PhysicalHDFSReader(storage, path);
break;
case file:
reader = new PhysicalLocalReader(storage, path);
break;
case s3:
reader = new PhysicalS3Reader(storage, path);
break;
default:
throw new IOException("Storage scheme '" + storage.getScheme() + "' is not supported.");
}

return reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

/**
* @author guodong
* @author hank
*/
public interface PhysicalWriter
extends Closeable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.pixelsdb.pixels.common.physical.impl.PhysicalHDFSWriter;
import io.pixelsdb.pixels.common.physical.impl.PhysicalLocalWriter;
import io.pixelsdb.pixels.common.physical.impl.PhysicalS3Writer;

import java.io.IOException;

Expand All @@ -41,21 +42,21 @@ public static PhysicalWriter newPhysicalWriter(Storage storage, String path, lon
{
checkArgument(storage != null, "storage should not be null");
checkArgument(path != null, "path should not be null");
PhysicalWriter writer = null;
try
{
switch (storage.getScheme())
{
case hdfs:
writer = new PhysicalHDFSWriter(storage, path, replication, addBlockPadding, blockSize);
case file:
writer = new PhysicalLocalWriter(storage, path);
case s3:
throw new IOException("S3 storage is not supported.");
}
} catch (IOException e)

PhysicalWriter writer;
switch (storage.getScheme())
{
throw e;
case hdfs:
writer = new PhysicalHDFSWriter(storage, path, replication, addBlockPadding, blockSize);
break;
case file:
writer = new PhysicalLocalWriter(storage, path);
break;
case s3:
writer = new PhysicalS3Writer(storage, path);
break;
default:
throw new IOException("Storage scheme '" + storage.getScheme() + "' is not supported.");
}

return writer;
Expand Down
Loading