Skip to content

Commit

Permalink
[HUDI-1469] Faster initialization of metadata table using parallelize…
Browse files Browse the repository at this point in the history
…d listing. (apache#2343)

* [HUDI-1469] Faster initialization of metadata table using parallelized listing which finds partitions and files in a single scan.
* MINOR fixes

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
  • Loading branch information
prashantwason and vinothchandar committed Dec 30, 2020
1 parent 80f929e commit d4c9642
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
Expand Down Expand Up @@ -213,6 +214,10 @@ public int getDeleteShuffleParallelism() {
return Math.max(Integer.parseInt(props.getProperty(DELETE_PARALLELISM)), 1);
}

public int getFileListingParallelism() {
return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
}

public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}
Expand Down Expand Up @@ -870,6 +875,11 @@ public Builder withDeleteParallelism(int parallelism) {
return this;
}

public Builder withFileListingParallelism(int parallelism) {
props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
return this;
}

public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism));
props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism));
Expand Down Expand Up @@ -1024,6 +1034,8 @@ protected void setDefaults() {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -67,7 +68,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

Expand All @@ -82,8 +82,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import scala.Tuple2;

import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
Expand Down Expand Up @@ -196,6 +194,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withFileListingParallelism(writeConfig.getFileListingParallelism())
.withFinalizeWriteParallelism(parallelism);

if (writeConfig.isMetricsOn()) {
Expand Down Expand Up @@ -311,43 +310,17 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
initTableMetadata();

// List all partitions in the basePath of the containing dataset
FileSystem fs = datasetMetaClient.getFs();
FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
datasetWriteConfig.shouldAssumeDatePartitioning());
List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");

// List all partitions in parallel and collect the files in them
int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism
JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism)
.mapToPair(partition -> {
FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition));
return new Tuple2<>(partition, statuses);
});

// Collect the list of partitions and file lists
List<Tuple2<String, FileStatus[]>> partitionFileList = partitionFileListRDD.collect();
LOG.info("Initializing metadata table by using file listings in " + datasetWriteConfig.getBasePath());
Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(jsc, datasetMetaClient);

// Create a HoodieCommitMetadata with writeStats for all discovered files
int[] stats = {0};
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();

partitionFileList.forEach(t -> {
final String partition = t._1;
try {
if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
return;
}
} catch (IOException e) {
throw new HoodieMetadataException("Failed to check partition " + partition, e);
}

partitionToFileStatus.forEach((partition, statuses) -> {
// Filter the statuses to only include files which were created before or on createInstantTime
Arrays.stream(t._2).filter(status -> {
statuses.stream().filter(status -> {
String filename = status.getPath().getName();
if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
return false;
}
if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
createInstantTime)) {
return false;
Expand All @@ -370,10 +343,59 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
}
});

LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata");
LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}

/**
* Function to find hoodie partitions and list files in them in parallel.
*
* @param jsc
* @param datasetMetaClient
* @return Map of partition names to a list of FileStatus for all the files in the partition
*/
private Map<String, List<FileStatus>> getPartitionsToFilesMapping(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaClient) {

List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(datasetWriteConfig.getBasePath()));

Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());

while (!pathsToList.isEmpty()) {
int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
List<Pair<Path, FileStatus[]>> dirToFileListing = jsc.parallelize(pathsToList, listingParallelism)
.map(path -> {
FileSystem fs = path.getFileSystem(conf.get());
return Pair.of(path, fs.listStatus(path));
}).collect();
pathsToList.clear();

// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
// the results.
dirToFileListing.forEach(p -> {
List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
.filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
.collect(Collectors.toList());

if (p.getRight().length > filesInDir.size()) {
// Is a partition. Add all data files to result.
partitionToFileStatus.put(p.getLeft().getName(), filesInDir);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
.map(fs -> fs.getPath())
.collect(Collectors.toList()));
}
});
}

return partitionToFileStatus;
}

/**
* Sync the Metadata Table from the instants created on the dataset.
*
Expand Down Expand Up @@ -454,7 +476,9 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
writeStats.forEach(hoodieWriteStat -> {
String pathWithPartition = hoodieWriteStat.getPath();
if (pathWithPartition == null) {
throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
// Empty partition
LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
return;
}

int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
Expand Down

0 comments on commit d4c9642

Please sign in to comment.