-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sync parquet to other file formats #592
base: main
Are you sure you want to change the base?
Changes from all commits
951b31c
24efa0f
9a4690f
fc4dfab
c9f52f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package org.apache.xtable.parquet; | ||
|
||
import java.io.IOException; | ||
import java.util.*; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: we're avoiding the use of |
||
import java.util.stream.Stream; | ||
import java.util.stream.StreamSupport; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.*; | ||
|
||
public class FileSystemHelper { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be a good idea to define an interface for getting the parquet files for the table and changes since the last run. Right now this is all being done through file listing but we should consider a case where someone implements a way to poll the changes through s3 events. |
||
|
||
private static final FileSystemHelper INSTANCE = new FileSystemHelper(); | ||
|
||
public static FileSystemHelper getInstance() { | ||
return INSTANCE; | ||
} | ||
|
||
public Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, String basePath) { | ||
try { | ||
FileSystem fs = FileSystem.get(hadoopConf); | ||
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(basePath), true); | ||
return remoteIteratorToStream(iterator) | ||
.filter(file -> file.getPath().getName().endsWith("parquet")); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public Map<String, List<String>> getPartitionFromDirectoryStructure( | ||
Configuration hadoopConf, String basePath, Map<String, List<String>> partitionMap) { | ||
|
||
try { | ||
FileSystem fs = FileSystem.get(hadoopConf); | ||
FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); | ||
Map<String, List<String>> currentPartitionMap = new HashMap<>(partitionMap); | ||
|
||
for (FileStatus dirStatus : baseFileStatus) { | ||
if (dirStatus.isDirectory()) { | ||
String partitionPath = dirStatus.getPath().getName(); | ||
if (partitionPath.contains("=")) { | ||
String[] partitionKeyValue = partitionPath.split("="); | ||
currentPartitionMap | ||
.computeIfAbsent(partitionKeyValue[0], k -> new ArrayList<>()) | ||
.add(partitionKeyValue[1]); | ||
getPartitionFromDirectoryStructure( | ||
hadoopConf, dirStatus.getPath().toString(), partitionMap); | ||
} | ||
} | ||
} | ||
return currentPartitionMap; | ||
|
||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private static <T> Stream<T> remoteIteratorToStream(RemoteIterator<T> remoteIterator) { | ||
Iterator<T> iterator = | ||
new Iterator<T>() { | ||
@Override | ||
public boolean hasNext() { | ||
try { | ||
return remoteIterator.hasNext(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public T next() { | ||
try { | ||
return remoteIterator.next(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
}; | ||
return StreamSupport.stream( | ||
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
package org.apache.xtable.parquet; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.*; | ||
import java.util.stream.Collectors; | ||
import lombok.Builder; | ||
import lombok.NonNull; | ||
import org.apache.avro.Schema; | ||
import org.apache.avro.SchemaBuilder; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.*; | ||
import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||
import org.apache.xtable.avro.AvroSchemaConverter; | ||
import org.apache.xtable.model.*; | ||
import org.apache.xtable.model.schema.InternalPartitionField; | ||
import org.apache.xtable.model.schema.InternalSchema; | ||
import org.apache.xtable.model.storage.*; | ||
import org.apache.xtable.spi.extractor.ConversionSource; | ||
|
||
@Builder | ||
public class ParquetConversionSource implements ConversionSource<Long> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it may be a bit more robust if we use a time interval instead of a single There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can u explain a bit on the interval and how are u envisioning ? this long is the last synced modification time of file so in next run list files greater mod time so new files are synced. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking of an interval since it can also easily show where the start time was for the sync. This could be useful when the targets fall out of sync with each other. Currently if there are commits |
||
|
||
private final String tableName; | ||
private final String basePath; | ||
@NonNull private final Configuration hadoopConf; | ||
|
||
@Builder.Default | ||
private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); | ||
|
||
@Builder.Default private static final FileSystemHelper fsHelper = FileSystemHelper.getInstance(); | ||
|
||
@Builder.Default | ||
private static final ParquetMetadataExtractor parquetMetadataExtractor = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an implementation for this class missing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i added initially but not using it will remove it |
||
ParquetMetadataExtractor.getInstance(); | ||
|
||
@Builder.Default | ||
private static final ParquetPartitionHelper parquetPartitionHelper = | ||
ParquetPartitionHelper.getInstance(); | ||
|
||
private Map<String, List<String>> initPartitionInfo() { | ||
return fsHelper.getPartitionFromDirectoryStructure( | ||
hadoopConf, basePath, Collections.emptyMap()); | ||
} | ||
|
||
/** | ||
* To infer schema getting the latest file assumption is that latest file will have new fields | ||
* | ||
* @param modificationTime the commit to consider for reading the table state | ||
* @return | ||
*/ | ||
@Override | ||
public InternalTable getTable(Long modificationTime) { | ||
|
||
Optional<LocatedFileStatus> latestFile = | ||
fsHelper | ||
.getParquetFiles(hadoopConf, basePath) | ||
.max(Comparator.comparing(FileStatus::getModificationTime)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to push down this filter so we don't need to iterate through all files under the base path? Maybe we can even limit the file listing to return files created after the modificationTime? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i dont think we can push down with the api , even to filter files greater than modification we have to first list and then filter out, do u have any other idea on your mind for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, just wanted to see if it was possible to help with large tables |
||
|
||
ParquetMetadata parquetMetadata = | ||
parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath()); | ||
Schema tableSchema = | ||
new org.apache.parquet.avro.AvroSchemaConverter() | ||
.convert(parquetMetadata.getFileMetaData().getSchema()); | ||
|
||
Set<String> partitionKeys = initPartitionInfo().keySet(); | ||
|
||
// merge schema of partition into original as partition is not part of parquet fie | ||
if (!partitionKeys.isEmpty()) { | ||
tableSchema = mergeAvroSchema(tableSchema, partitionKeys); | ||
} | ||
InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema); | ||
|
||
List<InternalPartitionField> partitionFields = | ||
partitionKeys.isEmpty() | ||
? Collections.emptyList() | ||
: parquetPartitionHelper.getInternalPartitionField(partitionKeys, schema); | ||
DataLayoutStrategy dataLayoutStrategy = | ||
partitionFields.isEmpty() | ||
? DataLayoutStrategy.FLAT | ||
: DataLayoutStrategy.HIVE_STYLE_PARTITION; | ||
return InternalTable.builder() | ||
.tableFormat(TableFormat.PARQUET) | ||
.basePath(basePath) | ||
.name(tableName) | ||
.layoutStrategy(dataLayoutStrategy) | ||
.partitioningFields(partitionFields) | ||
.readSchema(schema) | ||
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Here to get current snapshot listing all files hence the -1 is being passed | ||
* | ||
* @return | ||
*/ | ||
@Override | ||
public InternalSnapshot getCurrentSnapshot() { | ||
|
||
List<LocatedFileStatus> latestFile = | ||
fsHelper.getParquetFiles(hadoopConf, basePath).collect(Collectors.toList()); | ||
Map<String, List<String>> partitionInfo = initPartitionInfo(); | ||
InternalTable table = getTable(-1L); | ||
List<InternalDataFile> internalDataFiles = | ||
latestFile.stream() | ||
.map( | ||
file -> | ||
InternalDataFile.builder() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the logic for this conversion move to a common method that can also be called from the |
||
.physicalPath(file.getPath().toString()) | ||
.fileFormat(FileFormat.APACHE_PARQUET) | ||
.fileSizeBytes(file.getLen()) | ||
.partitionValues( | ||
parquetPartitionHelper.getPartitionValue( | ||
basePath, | ||
file.getPath().toString(), | ||
table.getReadSchema(), | ||
partitionInfo)) | ||
.lastModified(file.getModificationTime()) | ||
.columnStats( | ||
parquetMetadataExtractor.getColumnStatsForaFile( | ||
hadoopConf, file, table)) | ||
.build()) | ||
.collect(Collectors.toList()); | ||
|
||
return InternalSnapshot.builder() | ||
.table(table) | ||
.partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Whenever new file is added , condition to get new file is listing files whose modification time | ||
* is greater than previous ysnc | ||
* | ||
* @param modificationTime commit to capture table changes for. | ||
* @return | ||
*/ | ||
@Override | ||
public TableChange getTableChangeForCommit(Long modificationTime) { | ||
List<FileStatus> tableChanges = | ||
fsHelper | ||
.getParquetFiles(hadoopConf, basePath) | ||
.filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) | ||
.collect(Collectors.toList()); | ||
// TODO avoid doing full list of directory to get schema , just argument of modification time | ||
// needs to be tweaked | ||
InternalTable internalTable = getTable(-1L); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This call will also scan the file system if I'm understanding this correctly, can we avoid that? It can be expensive for large tables |
||
Set<InternalDataFile> internalDataFiles = new HashSet<>(); | ||
Map<String, List<String>> partitionInfo = initPartitionInfo(); | ||
for (FileStatus tableStatus : tableChanges) { | ||
internalDataFiles.add( | ||
InternalDataFile.builder() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need |
||
.physicalPath(tableStatus.getPath().toString()) | ||
.partitionValues( | ||
parquetPartitionHelper.getPartitionValue( | ||
basePath, | ||
tableStatus.getPath().toString(), | ||
internalTable.getReadSchema(), | ||
partitionInfo)) | ||
.lastModified(tableStatus.getModificationTime()) | ||
.fileSizeBytes(tableStatus.getLen()) | ||
.columnStats( | ||
parquetMetadataExtractor.getColumnStatsForaFile( | ||
hadoopConf, tableStatus, internalTable)) | ||
.build()); | ||
} | ||
|
||
return TableChange.builder() | ||
.tableAsOfChange(internalTable) | ||
.filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public CommitsBacklog<Long> getCommitsBacklog( | ||
InstantsForIncrementalSync instantsForIncrementalSync) { | ||
|
||
List<Long> commitsToProcess = | ||
Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); | ||
|
||
return CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build(); | ||
} | ||
|
||
// TODO Need to understnad how this needs to be implemented should _SUCCESS or .staging dir needs | ||
// to be checked | ||
@Override | ||
public boolean isIncrementalSyncSafeFrom(Instant instant) { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException {} | ||
|
||
private Schema mergeAvroSchema(Schema internalSchema, Set<String> parititonFields) { | ||
|
||
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = | ||
SchemaBuilder.record(internalSchema.getName()).fields(); | ||
for (Schema.Field field : internalSchema.getFields()) { | ||
fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the internal schema have defaults? Can it also have docs on fields? those would be dropped with this code |
||
} | ||
|
||
for (String paritionKey : parititonFields) { | ||
fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault(); | ||
} | ||
|
||
return fieldAssembler.endRecord(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.xtable.parquet; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.xtable.conversion.ConversionSourceProvider; | ||
import org.apache.xtable.conversion.SourceTable; | ||
|
||
/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */ | ||
public class ParquetConversionSourceProvider extends ConversionSourceProvider<Long> { | ||
@Override | ||
public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) { | ||
|
||
return ParquetConversionSource.builder() | ||
.tableName(sourceTable.getName()) | ||
.basePath(sourceTable.getBasePath()) | ||
.hadoopConf(new Configuration()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is an |
||
.build(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package org.apache.xtable.parquet; | ||
|
||
public class ParquetInternalDataFileConvertor { | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please run
mvn spotless:apply
to clean up some of the formatting issues in the draft