Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1347]fix Hbase index partition changes cause data duplication p… #2188

Merged
merged 4 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
public static final String HBASE_INDEX_UPDATE_PARTITION_PATH = "hoodie.hbase.index.update.partition.path";
public static final Boolean DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH = false;

/**
* When set to true, the rollback method will delete the last failed task index .
* The default value is false. Because deleting the index will add extra load on the Hbase cluster for each rollback.
*/
public static final String HBASE_INDEX_ROLLBACK_SYNC = "hoodie.index.hbase.rollback.sync";
public static final Boolean DEFAULT_HBASE_INDEX_ROLLBACK_SYNC = false;

public HoodieHBaseIndexConfig(final Properties props) {
super(props);
}
Expand Down Expand Up @@ -212,6 +219,11 @@ public Builder hbaseIndexUpdatePartitionPath(boolean updatePartitionPath) {
return this;
}

public Builder hbaseIndexRollbackSync(boolean rollbackSync) {
props.setProperty(HBASE_INDEX_ROLLBACK_SYNC, String.valueOf(rollbackSync));
return this;
}

public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
return this;
Expand Down Expand Up @@ -277,6 +289,8 @@ public HoodieHBaseIndexConfig build() {
String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_UPDATE_PARTITION_PATH), HBASE_INDEX_UPDATE_PARTITION_PATH,
String.valueOf(DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH));
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_ROLLBACK_SYNC), HBASE_INDEX_ROLLBACK_SYNC,
String.valueOf(DEFAULT_HBASE_INDEX_ROLLBACK_SYNC));
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ public int getHbaseIndexGetBatchSize() {
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
}

public Boolean getHBaseIndexRollbackSync() {
return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_ROLLBACK_SYNC));
}

public int getHbaseIndexPutBatchSize() {
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.index.hbase;

import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMemoryUtils;
Expand All @@ -28,6 +30,7 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RateLimiter;
Expand Down Expand Up @@ -67,6 +70,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -181,6 +185,10 @@ private Get generateStatement(String key) throws IOException {
.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
}

private Get generateStatement(String key, long startTime, long endTime) throws IOException {
return generateStatement(key).setTimeRange(startTime, endTime);
}

private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
// Check if the last commit ts for this row is 1) present in the timeline or
Expand Down Expand Up @@ -537,7 +545,72 @@ private Integer getNumRegionServersAliveForTable() {

@Override
public boolean rollbackCommit(String instantTime) {
// Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
boolean rollbackSync = config.getHBaseIndexRollbackSync();

if (!config.getHBaseIndexRollbackSync()) {
// Default Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
return true;
}

synchronized (SparkHoodieHBaseIndex.class) {
hj2016 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove comment in line 488.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hj2016 Can you please make the following changes :

`public boolean rollbackCommit(String instantTime) {

if (config.getHBaseIndexRollbackSync()) {
//
}
return true;
`
This keeps old behavior unchanged and safe and allows you to control deletes via the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be problems with the hbase index. The scenario that needs to be rolled back is that the hbase partition change is turned on and an error is reported after the hbase index is written for some reasons (some reasons may be due to jvm memory overflow, hbase suddenly crashes), for example, At the beginning, the data was id:1 partition:2019, and then another commit failed and the index was written to hbase. At this time, the index partition was changed to 2020. So the next time the data is written, it will only be written to In the 2020 partition, resulting in data duplication. After judging based on the rollbackSync parameter, the following logic will not be executed. If you set hbase.index.rollback.sync = false, hoodie.hbase.index.update.partition.path = true, there will still be problems. I think it would be more reasonable to write like this:

if (!config.getHbaseIndexUpdatePartitionPath()){
return true;
}
synchronized (SparkHoodieHBaseIndex.class) {
....
}
return true;

Because only when the partition changes, problems may occur.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash : looks like author is waiting for your response.

if (hbaseConnection == null || hbaseConnection.isClosed()) {
hbaseConnection = getHBaseConnection();
}
}
try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);

Long rollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime();
Long currentTime = new Date().getTime();
Scan scan = new Scan();
scan.addFamily(SYSTEM_COLUMN_FAMILY);
scan.setTimeRange(rollbackTime, currentTime);
ResultScanner scanner = hTable.getScanner(scan);
Iterator<Result> scannerIterator = scanner.iterator();

List<Get> statements = new ArrayList<>();
List<Result> currentVersionResults = new ArrayList<Result>();
List<Mutation> mutations = new ArrayList<>();
while (scannerIterator.hasNext()) {
Result result = scannerIterator.next();
currentVersionResults.add(result);
statements.add(generateStatement(Bytes.toString(result.getRow()), 0L, rollbackTime - 1));

if (scannerIterator.hasNext() && statements.size() < multiGetBatchSize) {
continue;
}
Result[] lastVersionResults = hTable.get(statements);
for (int i = 0; i < lastVersionResults.length; i++) {
Result lastVersionResult = lastVersionResults[i];
if (null == lastVersionResult.getRow() && rollbackSync) {
Result currentVersionResult = currentVersionResults.get(i);
Delete delete = new Delete(currentVersionResult.getRow());
mutations.add(delete);
}

if (null != lastVersionResult.getRow()) {
String oldPath = new String(lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
String nowPath = new String(currentVersionResults.get(i).getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
if (!oldPath.equals(nowPath) || rollbackSync) {
Put put = new Put(lastVersionResult.getRow());
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
mutations.add(put);
}
}
}
doMutations(mutator, mutations, limiter);
currentVersionResults.clear();
statements.clear();
mutations.clear();
}
} catch (Exception e) {
LOG.error("hbase index roll back failed", e);
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static void init() throws Exception {
utility = new HBaseTestingUtility(hbaseConfig);
utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration();
utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"));
utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"),2);
}

@BeforeEach
Expand Down Expand Up @@ -198,8 +198,8 @@ public void testTagLocationAndPartitionPathUpdate() throws Exception {
JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);

HoodieWriteConfig config = getConfig(true);
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true));
HoodieWriteConfig config = getConfig(true, false);
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true, false));

try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
// allowed path change test
Expand All @@ -225,7 +225,7 @@ public void testTagLocationAndPartitionPathUpdate() throws Exception {
assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count());

// not allowed path change test
index = new SparkHoodieHBaseIndex<>(getConfig(false));
index = new SparkHoodieHBaseIndex<>(getConfig(false, false));
List<HoodieRecord> notAllowPathChangeRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord -> hoodieRecord.isCurrentLocationKnown()
Expand Down Expand Up @@ -272,6 +272,66 @@ public void testTagLocationAndDuplicateUpdate() throws Exception {
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
}

@Test
public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws Exception {
final int numRecords = 10;
final String oldPartitionPath = "1970/01/01";
final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
HoodieWriteConfig config = getConfig(true, true);
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);

try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
final String firstCommitTime = writeClient.startCommit();
List<HoodieRecord> newRecords = dataGen.generateInserts(firstCommitTime, numRecords);
List<HoodieRecord> oldRecords = new LinkedList();
for (HoodieRecord newRecord: newRecords) {
HoodieKey key = new HoodieKey(newRecord.getRecordKey(), oldPartitionPath);
HoodieRecord hoodieRecord = new HoodieRecord(key, newRecord.getData());
oldRecords.add(hoodieRecord);
}
JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
// first commit old record
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
List<HoodieRecord> beforeFirstTaggedRecords = index.tagLocation(oldWriteRecords, context, hoodieTable).collect();
JavaRDD<WriteStatus> oldWriteStatues = writeClient.upsert(oldWriteRecords, firstCommitTime);
index.updateLocation(oldWriteStatues, context, hoodieTable);
writeClient.commit(firstCommitTime, oldWriteStatues);
List<HoodieRecord> afterFirstTaggedRecords = index.tagLocation(oldWriteRecords, context, hoodieTable).collect();

metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
final String secondCommitTime = writeClient.startCommit();
List<HoodieRecord> beforeSecondTaggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
JavaRDD<WriteStatus> newWriteStatues = writeClient.upsert(newWriteRecords, secondCommitTime);
index.updateLocation(newWriteStatues, context, hoodieTable);
writeClient.commit(secondCommitTime, newWriteStatues);
List<HoodieRecord> afterSecondTaggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
writeClient.rollback(secondCommitTime);
List<HoodieRecord> afterRollback = index.tagLocation(newWriteRecords, context, hoodieTable).collect();

// Verify the first commit
assertEquals(numRecords, beforeFirstTaggedRecords.stream().filter(record -> record.getCurrentLocation() == null).count());
assertEquals(numRecords, afterFirstTaggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
// Verify the second commit
assertEquals(numRecords, beforeSecondTaggedRecords.stream()
.filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath)
&& record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count());
assertEquals(numRecords * 2, beforeSecondTaggedRecords.stream().count());
assertEquals(numRecords, afterSecondTaggedRecords.stream().count());
assertEquals(numRecords, afterSecondTaggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
// Verify the rollback
// If an exception occurs after hbase writes the index and the index does not roll back,
// the currentLocation information will not be returned.
assertEquals(numRecords, afterRollback.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath)
&& record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count());
assertEquals(numRecords * 2, beforeSecondTaggedRecords.stream().count());
assertEquals(numRecords, afterRollback.stream().filter(HoodieRecord::isCurrentLocationKnown)
.filter(record -> record.getCurrentLocation().getInstantTime().equals(firstCommitTime)).count());
}
}

@Test
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
// Load to memory
Expand Down Expand Up @@ -413,7 +473,7 @@ public void testHbaseTagLocationForArchivedCommits() throws Exception {
params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1");
params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "3");
params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2");
HoodieWriteConfig config = getConfigBuilder(100, false).withProps(params).build();
HoodieWriteConfig config = getConfigBuilder(100, false, false).withProps(params).build();

SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
Expand Down Expand Up @@ -723,18 +783,18 @@ private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpda
}

private HoodieWriteConfig getConfig() {
return getConfigBuilder(100, false).build();
return getConfigBuilder(100, false, false).build();
}

private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
return getConfigBuilder(hbaseIndexBatchSize, false).build();
return getConfigBuilder(hbaseIndexBatchSize, false, false).build();
}

private HoodieWriteConfig getConfig(boolean updatePartitionPath) {
return getConfigBuilder(100, updatePartitionPath).build();
private HoodieWriteConfig getConfig(boolean updatePartitionPath, boolean rollbackSync) {
return getConfigBuilder(100, updatePartitionPath, rollbackSync).build();
}

private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath) {
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath, boolean rollbackSync) {
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(1, 1).withDeleteParallelism(1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
Expand All @@ -749,6 +809,7 @@ private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, bool
.hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
.hbaseIndexUpdatePartitionPath(updatePartitionPath)
.hbaseIndexRollbackSync(rollbackSync)
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
.build());
}
Expand Down