diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java index bea0b4774..22d26277e 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java @@ -27,8 +27,9 @@ public class TableFormat { public static final String HUDI = "HUDI"; public static final String ICEBERG = "ICEBERG"; public static final String DELTA = "DELTA"; + public static final String PARQUET="PARQUET"; public static String[] values() { - return new String[] {"HUDI", "ICEBERG", "DELTA"}; + return new String[] {"HUDI", "ICEBERG", "DELTA","PARQUET"}; } } diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e7..5c3a2359e 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -173,5 +173,11 @@ log4j-slf4j2-impl test + + + org.apache.parquet + parquet-avro + + diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java b/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java new file mode 100644 index 000000000..7eebca90a --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java @@ -0,0 +1,81 @@ +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +public class FileSystemHelper { + + private static final FileSystemHelper INSTANCE = new FileSystemHelper(); + + public static FileSystemHelper getInstance() { + return INSTANCE; + } + + public Stream getParquetFiles(Configuration hadoopConf, String basePath) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + RemoteIterator iterator = fs.listFiles(new Path(basePath), true); + return remoteIteratorToStream(iterator) + .filter(file -> file.getPath().getName().endsWith("parquet")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public Map> getPartitionFromDirectoryStructure( + Configuration hadoopConf, String basePath, Map> partitionMap) { + + try { + FileSystem fs = FileSystem.get(hadoopConf); + FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); + Map> currentPartitionMap = new HashMap<>(partitionMap); + + for (FileStatus dirStatus : baseFileStatus) { + if (dirStatus.isDirectory()) { + String partitionPath = dirStatus.getPath().getName(); + if (partitionPath.contains("=")) { + String[] partitionKeyValue = partitionPath.split("="); + currentPartitionMap + .computeIfAbsent(partitionKeyValue[0], k -> new ArrayList<>()) + .add(partitionKeyValue[1]); + getPartitionFromDirectoryStructure( + hadoopConf, dirStatus.getPath().toString(), partitionMap); + } + } + } + return currentPartitionMap; + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static Stream remoteIteratorToStream(RemoteIterator remoteIterator) { + Iterator iterator = + new Iterator() { + @Override + public boolean hasNext() { + try { + return remoteIterator.hasNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public T next() { + try { + return remoteIterator.next(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java new file mode 100644 index 000000000..f51a633a5 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -0,0 +1,209 @@ +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; +import lombok.Builder; +import lombok.NonNull; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.xtable.avro.AvroSchemaConverter; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.*; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Builder +public class ParquetConversionSource implements ConversionSource { + + private final String tableName; + private final String basePath; + @NonNull private final Configuration hadoopConf; + + @Builder.Default + private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); + + @Builder.Default private static final FileSystemHelper fsHelper = FileSystemHelper.getInstance(); + + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + @Builder.Default + private static final ParquetPartitionHelper parquetPartitionHelper = + ParquetPartitionHelper.getInstance(); + + private Map> initPartitionInfo() { + return fsHelper.getPartitionFromDirectoryStructure( + hadoopConf, basePath, Collections.emptyMap()); + } + + /** + * To infer schema getting the latest file assumption is that latest file will have new fields + * + * @param modificationTime the commit to consider for reading the table state + * @return + */ + @Override + public InternalTable getTable(Long modificationTime) { + + Optional latestFile = + fsHelper + .getParquetFiles(hadoopConf, basePath) + .max(Comparator.comparing(FileStatus::getModificationTime)); + + ParquetMetadata parquetMetadata = + parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath()); + Schema tableSchema = + new org.apache.parquet.avro.AvroSchemaConverter() + .convert(parquetMetadata.getFileMetaData().getSchema()); + + Set partitionKeys = initPartitionInfo().keySet(); + + // merge schema of partition into original as partition is not part of parquet fie + if (!partitionKeys.isEmpty()) { + tableSchema = mergeAvroSchema(tableSchema, partitionKeys); + } + InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema); + + List partitionFields = + partitionKeys.isEmpty() + ? Collections.emptyList() + : parquetPartitionHelper.getInternalPartitionField(partitionKeys, schema); + DataLayoutStrategy dataLayoutStrategy = + partitionFields.isEmpty() + ? DataLayoutStrategy.FLAT + : DataLayoutStrategy.HIVE_STYLE_PARTITION; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) + .build(); + } + + /** + * Here to get current snapshot listing all files hence the -1 is being passed + * + * @return + */ + @Override + public InternalSnapshot getCurrentSnapshot() { + + List latestFile = + fsHelper.getParquetFiles(hadoopConf, basePath).collect(Collectors.toList()); + Map> partitionInfo = initPartitionInfo(); + InternalTable table = getTable(-1L); + List internalDataFiles = + latestFile.stream() + .map( + file -> + InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getLen()) + .partitionValues( + parquetPartitionHelper.getPartitionValue( + basePath, + file.getPath().toString(), + table.getReadSchema(), + partitionInfo)) + .lastModified(file.getModificationTime()) + .columnStats( + parquetMetadataExtractor.getColumnStatsForaFile( + hadoopConf, file, table)) + .build()) + .collect(Collectors.toList()); + + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) + .build(); + } + + /** + * Whenever new file is added , condition to get new file is listing files whose modification time + * is greater than previous ysnc + * + * @param modificationTime commit to capture table changes for. + * @return + */ + @Override + public TableChange getTableChangeForCommit(Long modificationTime) { + List tableChanges = + fsHelper + .getParquetFiles(hadoopConf, basePath) + .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) + .collect(Collectors.toList()); + // TODO avoid doing full list of directory to get schema , just argument of modification time + // needs to be tweaked + InternalTable internalTable = getTable(-1L); + Set internalDataFiles = new HashSet<>(); + Map> partitionInfo = initPartitionInfo(); + for (FileStatus tableStatus : tableChanges) { + internalDataFiles.add( + InternalDataFile.builder() + .physicalPath(tableStatus.getPath().toString()) + .partitionValues( + parquetPartitionHelper.getPartitionValue( + basePath, + tableStatus.getPath().toString(), + internalTable.getReadSchema(), + partitionInfo)) + .lastModified(tableStatus.getModificationTime()) + .fileSizeBytes(tableStatus.getLen()) + .columnStats( + parquetMetadataExtractor.getColumnStatsForaFile( + hadoopConf, tableStatus, internalTable)) + .build()); + } + + return TableChange.builder() + .tableAsOfChange(internalTable) + .filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) + .build(); + } + + @Override + public CommitsBacklog getCommitsBacklog( + InstantsForIncrementalSync instantsForIncrementalSync) { + + List commitsToProcess = + Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); + + return CommitsBacklog.builder().commitsToProcess(commitsToProcess).build(); + } + + // TODO Need to understnad how this needs to be implemented should _SUCCESS or .staging dir needs + // to be checked + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + return true; + } + + @Override + public void close() throws IOException {} + + private Schema mergeAvroSchema(Schema internalSchema, Set parititonFields) { + + SchemaBuilder.FieldAssembler fieldAssembler = + SchemaBuilder.record(internalSchema.getName()).fields(); + for (Schema.Field field : internalSchema.getFields()) { + fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault(); + } + + for (String paritionKey : parititonFields) { + fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault(); + } + + return fieldAssembler.endRecord(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java new file mode 100644 index 000000000..8cb2ca2c7 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; + +/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */ +public class ParquetConversionSourceProvider extends ConversionSourceProvider { + @Override + public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) { + +return ParquetConversionSource.builder() + .tableName(sourceTable.getName()) + .basePath(sourceTable.getBasePath()) + .hadoopConf(new Configuration()) + .build(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java new file mode 100644 index 000000000..f5df4a7c5 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetInternalDataFileConvertor.java @@ -0,0 +1,5 @@ +package org.apache.xtable.parquet; + +public class ParquetInternalDataFileConvertor { + +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java new file mode 100644 index 000000000..1c185ced0 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java @@ -0,0 +1,69 @@ +package org.apache.xtable.parquet; + +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.schema.SchemaFieldFinder; + +public class ParquetPartitionHelper { + private static final ParquetPartitionHelper INSTANCE = new ParquetPartitionHelper(); + + public static ParquetPartitionHelper getInstance() { + return INSTANCE; + } + + public List getInternalPartitionField( + Set partitionList, InternalSchema schema) { + List partitionFields = new ArrayList<>(); + + for (String partitionKey : partitionList) { + + partitionFields.add( + InternalPartitionField.builder() + .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, partitionKey)) + .transformType(PartitionTransformType.VALUE) + .build()); + } + + return partitionFields; + } + + // TODO logic is too complicated can be simplified + public List getPartitionValue( + String basePath, + String filePath, + InternalSchema schema, + Map> partitionInfo) { + List partitionValues = new ArrayList<>(); + java.nio.file.Path base = Paths.get(basePath).normalize(); + java.nio.file.Path file = Paths.get(filePath).normalize(); + java.nio.file.Path relative = base.relativize(file); + for (Map.Entry> entry : partitionInfo.entrySet()) { + String key = entry.getKey(); + List values = entry.getValue(); + for (String value : values) { + String pathCheck = key + "=" + value; + if (relative.startsWith(pathCheck)) { + System.out.println("Relative " + relative + " " + pathCheck); + partitionValues.add( + PartitionValue.builder() + .partitionField( + InternalPartitionField.builder() + .sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, key)) + .transformType(PartitionTransformType.VALUE) + .build()) + .range(Range.scalar(value)) + .build()); + } + } + } + return partitionValues; + } +} diff --git a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml index c80c939bf..9d3686fb4 100644 --- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml +++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml @@ -27,6 +27,8 @@ ## configuration: A map of configuration values specific to this converter. tableFormatConverters: + PARQUET: + conversionSourceProviderClass: org.apache.xtable.parquet.ParquetConversionSourceProvider HUDI: conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider DELTA: @@ -37,4 +39,4 @@ tableFormatConverters: spark.app.name: xtable ICEBERG: conversionSourceProviderClass: org.apache.xtable.iceberg.IcebergConversionSourceProvider - conversionTargetProviderClass: org.apache.xtable.iceberg.IcebergConversionTarget \ No newline at end of file + conversionTargetProviderClass: org.apache.xtable.iceberg.IcebergConversionTarget