Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #158]: support loading data from and to arbitrary storage. #161

Merged
merged 3 commits into from
Feb 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Pixels is a columnar storage engine for data lakes. It is optimized for data ana
Moreover, all the storage optimizations in Pixels, including data layout reordering, columnar caching, and I/O scheduling, are transparent to query engines and underlying file/object storage systems.
Thus, it does not affect the maintainability and portability of the storage layer in data lakes.

## Build
## Build Pixels
Install JDK (>=8.0).
Open Pixels as maven project in Intellij. When the project is fully indexed and the dependencies are successfully downloaded,
use the maven's `package` command to build it. Some test params are missing for the unit tests, you can simply create arbitrary values for them.
Expand Down Expand Up @@ -214,6 +214,14 @@ However, **even if HDFS is not used**, Pixels has to read Hadoop configuration f
is specified by `hdfs.config.dir` in `PIXELS_HOME/pixels.properties`. Therefore, make sure that these two file
exist in `hdfs.config.dir`.

### AWS Credentials
If we use S3 as the underlying storage system, we have to configure the AWS credentials.

Currently, we do not configure the `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_DEFAULT_REGION` from Pixels.
Therefore, we have to configure these credentials using
[environment variables](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html#envvars-set) or
[credential files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html).

## Start Pixels
Enter `PIXELS_HOME` and start the daemons of Pixels using:
```bash
Expand Down Expand Up @@ -244,9 +252,10 @@ work in other kinds of VMs or physical servers.
### Prepare TPC-H

Attach a volume that is larger than the scale factor (e.g., 150GB for SF100) to the EC2 instance.
Mount the attached volume to a local path (e.g., `/data/tpch-100`).
Mount the attached volume to a local path (e.g., `/data/tpch`).
Download tpch-dbgen to the instance, build it, and generate the dataset and queries into the attached volume.
Here, we put the dataset in `/data/tpch-100/100g/`.
Here, we put the dataset in `/data/tpch/100g/`.
The file(s) of each table are stored in a separate directory named by the table name.

### Create TPC-H Database
Log in presto-cli and use the SQL statements in `scripts/sql/tpch_schema.sql` to create the TPC-H database in Pixels.
Expand All @@ -272,14 +281,14 @@ java -jar pixels-load-*-full.jar

Then use the following command in pixels-load to load data for the TPC-H tables:
```bash
LOAD -f pixels -o file:///data/tpch-100/100g/customer -d tpch -t customer -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch-100/100g/lineitem -d tpch -t lineitem -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch-100/100g/nation -d tpch -t nation -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch-100/100g/orders -d tpch -t orders -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch-100/100g/part -d tpch -t part -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch-100/100g/partsupp -d tpch -t partsupp -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch-100/100g/region -d tpch -t region -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch-100/100g/supplier -d tpch -t supplier -n 220000 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/customer -d tpch -t customer -n 319150 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/lineitem -d tpch -t lineitem -n 600040 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/nation -d tpch -t nation -n 100 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/orders -d tpch -t orders -n 638300 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/part -d tpch -t part -n 357150 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/partsupp -d tpch -t partsupp -n 360370 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/region -d tpch -t region -n 10 -r \| -c 1
LOAD -f pixels -o file:///data/tpch/100g/supplier -d tpch -t supplier -n 333340 -r \| -c 1
```
This may take a few hours.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ public boolean equals(Scheme other)

Scheme getScheme();

String ensureSchemePrefix(String path) throws IOException;

/**
* Get the statuses of the contents in this path if it is a directory.
* The path in the returned status does not start with the scheme name.
* For local fs, path is considered as local.
* @param path
* @return
Expand All @@ -100,6 +103,7 @@ public boolean equals(Scheme other)

/**
* Get the paths of the contents in this path if it is a directory.
* The returned path does not start with the scheme name.
* For local fs, path is considered as local.
* @param path
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
public class HDFS implements Storage
{
private static Logger logger = LogManager.getLogger(HDFS.class);
private static String SchemePrefix = Scheme.hdfs.name() + "://";

private FileSystem fs;
private Configuration conf;
Expand Down Expand Up @@ -99,6 +100,21 @@ public Scheme getScheme()
return Scheme.hdfs;
}

@Override
public String ensureSchemePrefix(String path) throws IOException
{
if (path.startsWith(SchemePrefix))
{
return path;
}
if (path.contains("://"))
{
throw new IOException("Path '" + path +
"' already has a different scheme prefix than '" + SchemePrefix + "'.");
}
return SchemePrefix + path;
}

@Override
public List<Status> listStatus(String path) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static io.pixelsdb.pixels.common.lock.EtcdAutoIncrement.InitId;
import static io.pixelsdb.pixels.common.utils.Constants.LOCAL_FS_ID_KEY;
import static io.pixelsdb.pixels.common.utils.Constants.LOCAL_FS_META_PREFIX;
import static java.util.Objects.requireNonNull;

/**
* Created at: 20/08/2021
Expand All @@ -53,6 +54,7 @@ public class LocalFS implements Storage
}

private static Logger logger = LogManager.getLogger(LocalFS.class);
private static String SchemePrefix = Scheme.file.name() + "://";

private String hostName;

Expand Down Expand Up @@ -92,19 +94,73 @@ private String getHostFromPathKey(String pathKey)
return pathKey.substring(last + 1);
}

public static class Path
{
public String realPath = null;
public boolean valid = false;
public boolean isDir = false;

public Path(String path)
{
requireNonNull(path);
if (path.startsWith("file:///"))
{
valid = true;
realPath = path.substring(path.indexOf("://") + 3);
}
else if (path.startsWith("/"))
{
valid = true;
realPath = path;
}

if (valid)
{
File file = new File(realPath);
isDir = file.isDirectory();
}
}

@Override
public String toString()
{
if (!this.valid)
{
return null;
}
return this.realPath;
}
}

@Override
public Scheme getScheme()
{
return Scheme.file;
}

@Override
public String ensureSchemePrefix(String path) throws IOException
{
if (path.startsWith(SchemePrefix))
{
return path;
}
if (path.contains("://"))
{
throw new IOException("Path '" + path +
"' already has a different scheme prefix than '" + SchemePrefix + "'.");
}
return SchemePrefix + path;
}

@Override
public List<Status> listStatus(String path) throws IOException
{
File[] files = new File(path).listFiles();
Path p = new Path(path);
File[] files = new File(p.realPath).listFiles();
if (files == null)
{
throw new IOException("Failed to list files in path: " + path + ".");
throw new IOException("Failed to list files in path: " + p.realPath + ".");
}
else
{
Expand All @@ -115,16 +171,17 @@ public List<Status> listStatus(String path) throws IOException
@Override
public Status getStatus(String path)
{
return new Status(new File(path));
return new Status(new File(new Path(path).realPath));
}

@Override
public List<String> listPaths(String path) throws IOException
{
File[] files = new File(path).listFiles();
Path p = new Path(path);
File[] files = new File(p.realPath).listFiles();
if (files == null)
{
throw new IOException("Failed to list files in path: " + path + ".");
throw new IOException("Failed to list files in path: " + p.realPath + ".");
}
else
{
Expand All @@ -136,6 +193,16 @@ public List<String> listPaths(String path) throws IOException
public long getFileId(String path) throws IOException
{
KeyValue kv = EtcdUtil.Instance().getKeyValue(getPathKey(path));
if (kv == null)
{
/**
* Issue #158:
* Create an id for this file if it does not exist in etcd.
*/
long id = GenerateId(LOCAL_FS_ID_KEY);
EtcdUtil.Instance().putKeyValue(getPathKey(path), Long.toString(id));
return id;
}
return Long.parseLong(kv.getValue().toString(StandardCharsets.UTF_8));
}

Expand Down Expand Up @@ -170,39 +237,42 @@ public String[] getHosts(String path) throws IOException
@Override
public DataInputStream open(String path) throws IOException
{
File file = new File(path);
Path p = new Path(path);
File file = new File(p.realPath);
if (file.isDirectory())
{
throw new IOException("Path '" + path + "' is a directory, it must be a file.");
throw new IOException("Path '" + p.realPath + "' is a directory, it must be a file.");
}
if (!file.exists())
{
throw new IOException("File '" + path + "' doesn't exists.");
throw new IOException("File '" + p.realPath + "' doesn't exists.");
}
return new DataInputStream(new FileInputStream(file));
}

public RandomAccessFile openRaf(String path) throws IOException
{
File file = new File(path);
Path p = new Path(path);
File file = new File(p.realPath);
if (file.isDirectory())
{
throw new IOException("Path '" + path + "' is a directory, it must be a file.");
throw new IOException("Path '" + p.realPath + "' is a directory, it must be a file.");
}
if (!file.exists())
{
throw new IOException("File '" + path + "' doesn't exists.");
throw new IOException("File '" + p.realPath + "' doesn't exists.");
}
return new RandomAccessFile(file, "r");
}

@Override
public DataOutputStream create(String path, boolean overwrite, int bufferSize, short replication) throws IOException
{
File file = new File(path);
Path p = new Path(path);
File file = new File(p.realPath);
if (file.isDirectory())
{
throw new IOException("Path '" + path + "' is a directory, it must be a file.");
throw new IOException("Path '" + p.realPath + "' is a directory, it must be a file.");
}
if (file.exists())
{
Expand All @@ -212,14 +282,14 @@ public DataOutputStream create(String path, boolean overwrite, int bufferSize, s
}
else
{
throw new IOException("File '" + path + "' already exists.");
throw new IOException("File '" + p.realPath + "' already exists.");
}
}
long id = GenerateId(LOCAL_FS_ID_KEY);
EtcdUtil.Instance().putKeyValue(getPathKey(path), Long.toString(id));
if (!file.createNewFile())
{
throw new IOException("Failed to create local file '" + path + "'.");
throw new IOException("Failed to create local file '" + p.realPath + "'.");
}
return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), bufferSize));
}
Expand All @@ -233,13 +303,14 @@ public DataOutputStream create(String path, boolean overwrite, int bufferSize, s
@Override
public boolean delete(String path, boolean recursive) throws IOException
{
File file = new File(path);
Path p = new Path(path);
File file = new File(p.realPath);
boolean subDeleted = true;
if (file.isDirectory())
{
if (!recursive)
{
throw new IOException("Can not delete a directory '" + path + "' with recursive = false.");
throw new IOException("Can not delete a directory '" + p.realPath + "' with recursive = false.");
}
else
{
Expand Down Expand Up @@ -279,18 +350,18 @@ public void close() throws IOException { }
@Override
public boolean exists(String path)
{
return new File(path).exists();
return new File(new Path(path).realPath).exists();
}

@Override
public boolean isFile(String path)
{
return new File(path).isFile();
return !new Path(path).isDir;
}

@Override
public boolean isDirectory(String path)
{
return new File(path).isDirectory();
return new Path(path).isDir;
}
}
Loading