diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
index bea0b4774..22d26277e 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
@@ -27,8 +27,9 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
+ public static final String PARQUET="PARQUET";
public static String[] values() {
- return new String[] {"HUDI", "ICEBERG", "DELTA"};
+ return new String[] {"HUDI", "ICEBERG", "DELTA","PARQUET"};
}
}
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index f277495e7..5c3a2359e 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -173,5 +173,11 @@
log4j-slf4j2-impl
test
+
+
+ org.apache.parquet
+ parquet-avro
+
+
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java b/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java
new file mode 100644
index 000000000..7eebca90a
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java
@@ -0,0 +1,81 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class FileSystemHelper {
+
+ private static final FileSystemHelper INSTANCE = new FileSystemHelper();
+
+ public static FileSystemHelper getInstance() {
+ return INSTANCE;
+ }
+
+ public Stream getParquetFiles(Configuration hadoopConf, String basePath) {
+ try {
+ FileSystem fs = FileSystem.get(hadoopConf);
+ RemoteIterator iterator = fs.listFiles(new Path(basePath), true);
+ return remoteIteratorToStream(iterator)
+ .filter(file -> file.getPath().getName().endsWith("parquet"));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Map> getPartitionFromDirectoryStructure(
+ Configuration hadoopConf, String basePath, Map> partitionMap) {
+
+ try {
+ FileSystem fs = FileSystem.get(hadoopConf);
+ FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath));
+ Map> currentPartitionMap = new HashMap<>(partitionMap);
+
+ for (FileStatus dirStatus : baseFileStatus) {
+ if (dirStatus.isDirectory()) {
+ String partitionPath = dirStatus.getPath().getName();
+ if (partitionPath.contains("=")) {
+ String[] partitionKeyValue = partitionPath.split("=");
+ currentPartitionMap
+ .computeIfAbsent(partitionKeyValue[0], k -> new ArrayList<>())
+ .add(partitionKeyValue[1]);
+ getPartitionFromDirectoryStructure(
+ hadoopConf, dirStatus.getPath().toString(), partitionMap);
+ }
+ }
+ }
+ return currentPartitionMap;
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Stream remoteIteratorToStream(RemoteIterator remoteIterator) {
+ Iterator iterator =
+ new Iterator() {
+ @Override
+ public boolean hasNext() {
+ try {
+ return remoteIterator.hasNext();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public T next() {
+ try {
+ return remoteIterator.next();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ return StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
new file mode 100644
index 000000000..f51a633a5
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource {
+
+ private final String tableName;
+ private final String basePath;
+ @NonNull private final Configuration hadoopConf;
+
+ @Builder.Default
+ private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance();
+
+ @Builder.Default private static final FileSystemHelper fsHelper = FileSystemHelper.getInstance();
+
+ @Builder.Default
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+
+ @Builder.Default
+ private static final ParquetPartitionHelper parquetPartitionHelper =
+ ParquetPartitionHelper.getInstance();
+
+ private Map> initPartitionInfo() {
+ return fsHelper.getPartitionFromDirectoryStructure(
+ hadoopConf, basePath, Collections.emptyMap());
+ }
+
+ /**
+ * To infer schema getting the latest file assumption is that latest file will have new fields
+ *
+ * @param modificationTime the commit to consider for reading the table state
+ * @return
+ */
+ @Override
+ public InternalTable getTable(Long modificationTime) {
+
+ Optional latestFile =
+ fsHelper
+ .getParquetFiles(hadoopConf, basePath)
+ .max(Comparator.comparing(FileStatus::getModificationTime));
+
+ ParquetMetadata parquetMetadata =
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath());
+ Schema tableSchema =
+ new org.apache.parquet.avro.AvroSchemaConverter()
+ .convert(parquetMetadata.getFileMetaData().getSchema());
+
+ Set partitionKeys = initPartitionInfo().keySet();
+
+ // merge schema of partition into original as partition is not part of parquet fie
+ if (!partitionKeys.isEmpty()) {
+ tableSchema = mergeAvroSchema(tableSchema, partitionKeys);
+ }
+ InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema);
+
+ List partitionFields =
+ partitionKeys.isEmpty()
+ ? Collections.emptyList()
+ : parquetPartitionHelper.getInternalPartitionField(partitionKeys, schema);
+ DataLayoutStrategy dataLayoutStrategy =
+ partitionFields.isEmpty()
+ ? DataLayoutStrategy.FLAT
+ : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+ return InternalTable.builder()
+ .tableFormat(TableFormat.PARQUET)
+ .basePath(basePath)
+ .name(tableName)
+ .layoutStrategy(dataLayoutStrategy)
+ .partitioningFields(partitionFields)
+ .readSchema(schema)
+ .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime()))
+ .build();
+ }
+
+ /**
+ * Here to get current snapshot listing all files hence the -1 is being passed
+ *
+ * @return
+ */
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+
+ List latestFile =
+ fsHelper.getParquetFiles(hadoopConf, basePath).collect(Collectors.toList());
+ Map> partitionInfo = initPartitionInfo();
+ InternalTable table = getTable(-1L);
+ List internalDataFiles =
+ latestFile.stream()
+ .map(
+ file ->
+ InternalDataFile.builder()
+ .physicalPath(file.getPath().toString())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .fileSizeBytes(file.getLen())
+ .partitionValues(
+ parquetPartitionHelper.getPartitionValue(
+ basePath,
+ file.getPath().toString(),
+ table.getReadSchema(),
+ partitionInfo))
+ .lastModified(file.getModificationTime())
+ .columnStats(
+ parquetMetadataExtractor.getColumnStatsForaFile(
+ hadoopConf, file, table))
+ .build())
+ .collect(Collectors.toList());
+
+ return InternalSnapshot.builder()
+ .table(table)
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+ .build();
+ }
+
+ /**
+ * Whenever new file is added , condition to get new file is listing files whose modification time
+ * is greater than previous ysnc
+ *
+ * @param modificationTime commit to capture table changes for.
+ * @return
+ */
+ @Override
+ public TableChange getTableChangeForCommit(Long modificationTime) {
+ List tableChanges =
+ fsHelper
+ .getParquetFiles(hadoopConf, basePath)
+ .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime)
+ .collect(Collectors.toList());
+ // TODO avoid doing full list of directory to get schema , just argument of modification time
+ // needs to be tweaked
+ InternalTable internalTable = getTable(-1L);
+ Set internalDataFiles = new HashSet<>();
+ Map> partitionInfo = initPartitionInfo();
+ for (FileStatus tableStatus : tableChanges) {
+ internalDataFiles.add(
+ InternalDataFile.builder()
+ .physicalPath(tableStatus.getPath().toString())
+ .partitionValues(
+ parquetPartitionHelper.getPartitionValue(
+ basePath,
+ tableStatus.getPath().toString(),
+ internalTable.getReadSchema(),
+ partitionInfo))
+ .lastModified(tableStatus.getModificationTime())
+ .fileSizeBytes(tableStatus.getLen())
+ .columnStats(
+ parquetMetadataExtractor.getColumnStatsForaFile(
+ hadoopConf, tableStatus, internalTable))
+ .build());
+ }
+
+ return TableChange.builder()
+ .tableAsOfChange(internalTable)
+ .filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build())
+ .build();
+ }
+
+ @Override
+ public CommitsBacklog getCommitsBacklog(
+ InstantsForIncrementalSync instantsForIncrementalSync) {
+
+ List commitsToProcess =
+ Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli());
+
+ return CommitsBacklog.builder().commitsToProcess(commitsToProcess).build();
+ }
+
+ // TODO Need to understnad how this needs to be implemented should _SUCCESS or .staging dir needs
+ // to be checked
+ @Override
+ public boolean isIncrementalSyncSafeFrom(Instant instant) {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ private Schema mergeAvroSchema(Schema internalSchema, Set parititonFields) {
+
+ SchemaBuilder.FieldAssembler fieldAssembler =
+ SchemaBuilder.record(internalSchema.getName()).fields();
+ for (Schema.Field field : internalSchema.getFields()) {
+ fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault();
+ }
+
+ for (String paritionKey : parititonFields) {
+ fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault();
+ }
+
+ return fieldAssembler.endRecord();
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
new file mode 100644
index 000000000..8cb2ca2c7
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+
+/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */
+public class ParquetConversionSourceProvider extends ConversionSourceProvider {
+ @Override
+ public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) {
+
+return ParquetConversionSource.builder()
+ .tableName(sourceTable.getName())
+ .basePath(sourceTable.getBasePath())
+ .hadoopConf(new Configuration())
+ .build();
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java
new file mode 100644
index 000000000..f5df4a7c5
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java
@@ -0,0 +1,5 @@
+package org.apache.xtable.parquet;
+
+public class ParquetInternalDataFileConvertor {
+
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java
new file mode 100644
index 000000000..1c185ced0
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java
@@ -0,0 +1,69 @@
+package org.apache.xtable.parquet;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+public class ParquetPartitionHelper {
+ private static final ParquetPartitionHelper INSTANCE = new ParquetPartitionHelper();
+
+ public static ParquetPartitionHelper getInstance() {
+ return INSTANCE;
+ }
+
+ public List getInternalPartitionField(
+ Set partitionList, InternalSchema schema) {
+ List partitionFields = new ArrayList<>();
+
+ for (String partitionKey : partitionList) {
+
+ partitionFields.add(
+ InternalPartitionField.builder()
+ .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, partitionKey))
+ .transformType(PartitionTransformType.VALUE)
+ .build());
+ }
+
+ return partitionFields;
+ }
+
+ // TODO logic is too complicated can be simplified
+ public List getPartitionValue(
+ String basePath,
+ String filePath,
+ InternalSchema schema,
+ Map> partitionInfo) {
+ List partitionValues = new ArrayList<>();
+ java.nio.file.Path base = Paths.get(basePath).normalize();
+ java.nio.file.Path file = Paths.get(filePath).normalize();
+ java.nio.file.Path relative = base.relativize(file);
+ for (Map.Entry> entry : partitionInfo.entrySet()) {
+ String key = entry.getKey();
+ List values = entry.getValue();
+ for (String value : values) {
+ String pathCheck = key + "=" + value;
+ if (relative.startsWith(pathCheck)) {
+ System.out.println("Relative " + relative + " " + pathCheck);
+ partitionValues.add(
+ PartitionValue.builder()
+ .partitionField(
+ InternalPartitionField.builder()
+ .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, key))
+ .transformType(PartitionTransformType.VALUE)
+ .build())
+ .range(Range.scalar(value))
+ .build());
+ }
+ }
+ }
+ return partitionValues;
+ }
+}
diff --git a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
index c80c939bf..9d3686fb4 100644
--- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
+++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
@@ -27,6 +27,8 @@
## configuration: A map of configuration values specific to this converter.
tableFormatConverters:
+ PARQUET:
+ conversionSourceProviderClass: org.apache.xtable.parquet.ParquetConversionSourceProvider
HUDI:
conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider
DELTA:
@@ -37,4 +39,4 @@ tableFormatConverters:
spark.app.name: xtable
ICEBERG:
conversionSourceProviderClass: org.apache.xtable.iceberg.IcebergConversionSourceProvider
- conversionTargetProviderClass: org.apache.xtable.iceberg.IcebergConversionTarget
\ No newline at end of file
+ conversionTargetProviderClass: org.apache.xtable.iceberg.IcebergConversionTarget