Skip to content

Commit

Permalink
MINOR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Dec 21, 2020
1 parent 828a51a commit d71922d
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.shuffle.parallelism";
public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

Expand All @@ -83,8 +82,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import scala.Tuple2;

import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
Expand Down Expand Up @@ -313,21 +310,7 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
initTableMetadata();

// List all partitions in the basePath of the containing dataset
FileSystem fs = datasetMetaClient.getFs();
FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
datasetWriteConfig.shouldAssumeDatePartitioning());
List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");

// List all partitions in parallel and collect the files in them
int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism
JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism)
.mapToPair(partition -> {
FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition));
return new Tuple2<>(partition, statuses);
});
LOG.info("Initializing metadata table by using file listings in " + datasetWriteConfig.getBasePath());

Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(jsc, datasetMetaClient);

// Create a HoodieCommitMetadata with writeStats for all discovered files
Expand Down Expand Up @@ -371,10 +354,11 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
* @param datasetMetaClient
* @return Map of partition names to a list of FileStatus for all the files in the partition
*/
private Map<String, List<FileStatus>> getPartitionsToFilesMapping(JavaSparkContext jsc,
HoodieTableMetaClient datasetMetaClient) {
private Map<String, List<FileStatus>> getPartitionsToFilesMapping(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaClient) {

List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(datasetWriteConfig.getBasePath()));

Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
Expand Down

0 comments on commit d71922d

Please sign in to comment.