Skip to content

Commit

Permalink
[HUDI-1354] Block updates and replace on file groups in clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
lw309637554 committed Dec 27, 2020
1 parent f6c4862 commit 589e9b1
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.hudi.config;

import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.table.action.clustering.update.RejectUpdateStrategy;
import org.apache.hudi.table.action.clustering.update.UpdateStrategy;

import java.io.File;
import java.io.FileReader;
Expand Down Expand Up @@ -75,11 +73,16 @@ public class HoodieClusteringConfig extends DefaultHoodieConfig {
public static final String CLUSTERING_TARGET_FILE_MAX_BYTES = CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes";
public static final String DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES = String.valueOf(1 * 1024 * 1024 * 1024L); // 1GB

// constants related to clustering that may be used by more than 1 strategy.
// Constants related to clustering that may be used by more than 1 strategy.
public static final String CLUSTERING_SORT_COLUMNS_PROPERTY = HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns";


// When file groups is in clustering, need to handle the update to these file groups. Default strategy just reject the update
public static final String CLUSTERING_UPDATES_STRATEGY_PROP = "hoodie.clustering.updates.strategy";
public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY = RejectUpdateStrategy.class.getName();
public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY = "org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy";

// Async clustering
public static final String ASYNC_CLUSTERING_ENABLE_OPT_KEY = "hoodie.clustering.async.enabled";
public static final String DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = "false";

public HoodieClusteringConfig(Properties props) {
super(props);
Expand Down Expand Up @@ -140,8 +143,8 @@ public Builder withClusteringTargetFileMaxBytes(long targetFileSize) {
return this;
}

public Builder withInlineClustering(Boolean inlineCompaction) {
props.setProperty(INLINE_CLUSTERING_PROP, String.valueOf(inlineCompaction));
public Builder withInlineClustering(Boolean inlineClustering) {
props.setProperty(INLINE_CLUSTERING_PROP, String.valueOf(inlineClustering));
return this;
}

Expand All @@ -155,8 +158,13 @@ public Builder fromProperties(Properties props) {
return this;
}

public Builder withClusteringUpdatesStrategy(UpdateStrategy updatesStrategy) {
props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP, updatesStrategy.getClass().getName());
public Builder withClusteringUpdatesStrategy(String updatesStrategyClass) {
props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP, updatesStrategyClass);
return this;
}

public Builder withAsyncClustering(Boolean asyncClustering) {
props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, String.valueOf(asyncClustering));
return this;
}

Expand All @@ -183,6 +191,8 @@ public HoodieClusteringConfig build() {
DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT);
setDefaultOnCondition(props, !props.containsKey(CLUSTERING_UPDATES_STRATEGY_PROP), CLUSTERING_UPDATES_STRATEGY_PROP,
DEFAULT_CLUSTERING_UPDATES_STRATEGY);
setDefaultOnCondition(props, !props.containsKey(ASYNC_CLUSTERING_ENABLE_OPT_KEY), ASYNC_CLUSTERING_ENABLE_OPT_KEY,
DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL);
return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.clustering.update.UpdateStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -396,6 +395,15 @@ public boolean isInlineClustering() {
return Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_PROP));
}

public boolean isAsyncClusteringEnabled() {
return Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
}

public boolean isClusteringEnabled() {
// TODO: future support async clustering
return isInlineClustering() || isAsyncClusteringEnabled();
}

public int getInlineClusterMaxCommits() {
return Integer.parseInt(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP));
}
Expand All @@ -416,8 +424,8 @@ public Boolean shouldCleanBootstrapBaseFile() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED));
}

public UpdateStrategy getClusteringUpdatesStrategy() {
return ReflectionUtils.loadClass(props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP));
public String getClusteringUpdatesStrategyClass() {
return props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,33 @@
* limitations under the License.
*/

package org.apache.hudi.table.action.clustering.update;
package org.apache.hudi.table.action.cluster.strategy;

import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.common.model.HoodieRecordPayload;

import java.util.List;
import java.util.Set;

/**
* When file groups in clustering, write records to these file group need to check.
*/
public interface UpdateStrategy {
public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {

protected final HoodieEngineContext engineContext;
protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;

protected UpdateStrategy(HoodieEngineContext engineContext, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
this.engineContext = engineContext;
this.fileGroupsInPendingClustering = fileGroupsInPendingClustering;
}

/**
* check the update records to the file group in clustering.
* @param fileGroupsInPendingClustering
* @param workloadProfile workloadProfile have the records update info,
* just like BaseSparkCommitActionExecutor.getUpsertPartitioner use it.
* Check the update records to the file group in clustering.
* @param taggedRecordsRDD the records to write, tagged with target file id,
* future can update tagged records location to a different fileId.
* @return the recordsRDD strategy updated
*/
void apply(List<Pair<HoodieFileGroupId, HoodieInstant>> fileGroupsInPendingClustering, WorkloadProfile workloadProfile);
public abstract I handleUpdate(I taggedRecordsRDD);

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
Expand All @@ -50,7 +49,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
Expand All @@ -70,15 +68,6 @@ public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig c
this.taskContextSupplier = context.getTaskContextSupplier();
}

protected void clusteringUpdateCheck(WorkloadProfile profile) {
if (profile == null) {
throw new HoodieException("Need workload profile to check clustering update now.");
}
// apply clustering update strategy.
config.getClusteringUpdatesStrategy()
.apply(table.getFileSystemView().getFileGroupsInPendingClustering().collect(Collectors.toList()), profile);
}

public abstract HoodieWriteMetadata<O> execute(I inputRecords);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.clustering.update.strategy;

import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

import java.util.HashSet;
import java.util.List;

/**
* Update strategy based on following.
* if some file group have update record, throw exception
*/
public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
private static final Logger LOG = LogManager.getLogger(SparkRejectUpdateStrategy.class);

public SparkRejectUpdateStrategy(HoodieSparkEngineContext engineContext, HashSet<HoodieFileGroupId> fileGroupsInPendingClustering) {
super(engineContext, fileGroupsInPendingClustering);
}

private List<HoodieFileGroupId> getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
List<HoodieFileGroupId> fileGroupIdsWithUpdates = inputRecords
.filter(record -> record.getCurrentLocation() != null)
.map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect();
return fileGroupIdsWithUpdates;
}

@Override
public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) {
String msg = String.format("Not allowed to update the clustering file group %s. "
+ "For pending clustering operations, we are not going to support update for now.",
fileGroupIdWithRecordUpdate.toString());
LOG.error(msg);
throw new HoodieClusteringUpdateException(msg);
}
});
return taggedRecordsRDD;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
Expand All @@ -46,6 +48,7 @@
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
Expand All @@ -59,11 +62,13 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.stream.Collectors;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Map;

public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
Expand All @@ -88,6 +93,18 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context,
super(context, config, table, instantTime, operationType, extraMetadata);
}

private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
if (config.isClusteringEnabled()) {
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
return (JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
} else {
return inputRecordsRDD;
}
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
Expand All @@ -107,11 +124,12 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}

// use profile to check clustering update
this.clusteringUpdateCheck(profile);
// handle records update with clustering
JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = clusteringHandleUpdate(inputRecordsRDD);

// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD, partitioner);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
if (WriteOperationType.isChangingRecords(operationType)) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
Expand Down
Loading

0 comments on commit 589e9b1

Please sign in to comment.