Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1347]fix Hbase index partition changes cause data duplication p… #2188

Merged
merged 4 commits into from
Feb 24, 2021

Conversation

hj2016
Copy link
Contributor

@hj2016 hj2016 commented Oct 18, 2020

…roblems

Tips

What is the purpose of the pull request

fix Hbase index partition changes cause data duplication problems

Brief change log

(for example:)

  • 1.Fixed the error of partition information in HoodieRecord key caused by deduplication operation
  • 2.The hbase index adds a rollback operation instead of doing nothing. The partition change needs to be rolledback to the index of the last successful commit。

Verify this pull request

  • Add test for partition change rollback scenario

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-io
Copy link

codecov-io commented Oct 18, 2020

Codecov Report

Merging #2188 (0c3a394) into master (2efd076) will decrease coverage by 59.77%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #2188       +/-   ##
============================================
- Coverage     69.46%   9.69%   -59.78%     
+ Complexity      356      48      -308     
============================================
  Files            53      53               
  Lines          1929    1929               
  Branches        230     230               
============================================
- Hits           1340     187     -1153     
- Misses          456    1729     +1273     
+ Partials        133      13      -120     
Flag Coverage Δ Complexity Δ
hudiutilities 9.69% <ø> (-59.78%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...va/org/apache/hudi/utilities/IdentitySplitter.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...va/org/apache/hudi/utilities/schema/SchemaSet.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
...a/org/apache/hudi/utilities/sources/RowSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
.../org/apache/hudi/utilities/sources/AvroSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
.../org/apache/hudi/utilities/sources/JsonSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
...rg/apache/hudi/utilities/sources/CsvDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-10.00%)
...g/apache/hudi/utilities/sources/JsonDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
...apache/hudi/utilities/sources/JsonKafkaSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-6.00%)
...pache/hudi/utilities/sources/ParquetDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-5.00%)
...lities/schema/SchemaProviderWithPostProcessor.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
... and 30 more

Copy link
Contributor

@n3nash n3nash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hj2016 Left some comments, needs clarification.
@v3nkatesh please review this as well

@hj2016 hj2016 force-pushed the fix_hbase_index_change branch from ee843ed to 8cb7225 Compare November 2, 2020 13:41
@hj2016
Copy link
Contributor Author

hj2016 commented Nov 3, 2020

@hj2016 Left some comments, needs clarification.
@v3nkatesh please review this as well

@n3nash can yon review my pr?

@umehrot2
Copy link
Contributor

umehrot2 commented Nov 3, 2020

cc @rmpifer good candidate for you to review

@vinothchandar
Copy link
Member

@n3nash @v3nkatesh could you please review this again?

@vinothchandar
Copy link
Member

@n3nash could you please rebase and take care of landing this?
@v3nkatesh your review would be appreciated, to ensure nothing regresses for you folks at uber

@vinothchandar
Copy link
Member

cc @satishkotha @nbalajee as well, in case one of you have cycles.

@leesf
Copy link
Contributor

leesf commented Dec 26, 2020

@hj2016 would you please rebase to master? and please review @n3nash when free.

@@ -62,7 +62,7 @@ public static SparkWriteHelper newInstance() {
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
return new HoodieRecord<T>(rec1.getKey(), reducedData);
return new HoodieRecord<T>(rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(), reducedData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why this is required. this is within reduceByKey and so rec1.getKey and rec2.getKey should be same right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, there are two data with the same primary key for upsert
id partitionPath updateTime
1 2018 2019-01-01
1 2019 2019-02-01
After the data is deduplicated,
Expected return: (1,2019)->(1,2019,2019-02-01)
Actual return: (1,2018)->(1,2019,2019-02-01)
In this way, the hoodile key and the data content will be inconsistent, resulting in writing to the wrong partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was resolving conflicts, it seemed that someone encountered a similar problem. #2248

@@ -480,6 +486,68 @@ private Integer getNumRegionServersAliveForTable() {
@Override
public boolean rollbackCommit(String instantTime) {
// Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
synchronized (SparkHoodieHBaseIndex.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove comment in line 488.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hj2016 Can you please make the following changes :

`public boolean rollbackCommit(String instantTime) {

if (config.getHBaseIndexRollbackSync()) {
//
}
return true;
`
This keeps old behavior unchanged and safe and allows you to control deletes via the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be problems with the hbase index. The scenario that needs to be rolled back is that the hbase partition change is turned on and an error is reported after the hbase index is written for some reasons (some reasons may be due to jvm memory overflow, hbase suddenly crashes), for example, At the beginning, the data was id:1 partition:2019, and then another commit failed and the index was written to hbase. At this time, the index partition was changed to 2020. So the next time the data is written, it will only be written to In the 2020 partition, resulting in data duplication. After judging based on the rollbackSync parameter, the following logic will not be executed. If you set hbase.index.rollback.sync = false, hoodie.hbase.index.update.partition.path = true, there will still be problems. I think it would be more reasonable to write like this:

if (!config.getHbaseIndexUpdatePartitionPath()){
return true;
}
synchronized (SparkHoodieHBaseIndex.class) {
....
}
return true;

Because only when the partition changes, problems may occur.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash : looks like author is waiting for your response.

if (null == lastVersionResult.getRow() && rollbackSync) {
Result currentVersionResult = currentVersionResults.get(i);
Delete delete = new Delete(currentVersionResult.getRow());
// delete.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, currentTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why commented out code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I thought it was necessary to specify the column to delete, I later found that it is not necessary to specify the column. I think I can delete the comment code.

@@ -263,6 +265,66 @@ public void testTagLocationAndDuplicateUpdate() throws Exception {
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
}

@Test
public void testTagLocationAndPartitionPathUpdateWithRollback() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so can you confirm that this test fails if not for the fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@@ -263,6 +265,66 @@ public void testTagLocationAndDuplicateUpdate() throws Exception {
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
}

@Test
public void testTagLocationAndPartitionPathUpdateWithRollback() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this to `testTagLocationAndPartitionPathUpdateWithExplicitRollback"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

@n3nash n3nash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hj2016 Can you please resolve @nsivabalan comments and one from me please, we want to merge this before the 0.7.0 release

@hj2016 hj2016 force-pushed the fix_hbase_index_change branch from 8cb7225 to 1573c9d Compare December 29, 2020 01:32
@hj2016
Copy link
Contributor Author

hj2016 commented Dec 29, 2020

@n3nash @nsivabalan @leesf I have completed the suggested changes above. You can review if there are other problems.

Copy link
Contributor

@n3nash n3nash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hj2016 Thanks for your changes and patience. I've asked for 1 more change which is necessary to ensure we don't cause any regression, can you ptal ?

@hj2016
Copy link
Contributor Author

hj2016 commented Jan 5, 2021

@n3nash How do you feel about my suggestion?

@nsivabalan nsivabalan added the priority:major degraded perf; unable to move forward; potential bugs label Feb 11, 2021
@nsivabalan
Copy link
Contributor

nsivabalan commented Feb 18, 2021

@n3nash : Can you please take this across the finish line. We tried to get this into previous release itself. Don't wanna scramble again for next release. Appreciate if you can spend some time on this.

@hj2016
Copy link
Contributor Author

hj2016 commented Feb 20, 2021

@nsivabalan Maybe n3nash is busy and hasn't responded to me. I first modified it according to my ideas. You can confirm whether you can merge. Leave me a message if you have any questions.

@hj2016 hj2016 force-pushed the fix_hbase_index_change branch from 1573c9d to c904625 Compare February 20, 2021 09:18
int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
boolean rollbackSync = config.getHBaseIndexRollbackSync();

if (!config.getHbaseIndexUpdatePartitionPath()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getHBaseIndexRollbackSync

final String oldPartitionPath = "1970/01/01";
final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
HoodieWriteConfig config = getConfig(true);
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point me to the place where we set the new config to true.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pushed some small change by myself. I will ping nishith to see if he can take a quick look and land this

@nsivabalan nsivabalan force-pushed the fix_hbase_index_change branch from d2407cb to e9bef5a Compare February 23, 2021 13:49
@nsivabalan
Copy link
Contributor

@n3nash : I have updated the patch w/ some minor fixes. Will land this in once CI succeeds.

@nsivabalan nsivabalan merged commit 77ba561 into apache:master Feb 24, 2021
@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 51.19%. Comparing base (2efd076) to head (0c3a394).
Report is 4476 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #2188   +/-   ##
=========================================
  Coverage     51.18%   51.19%           
  Complexity     3226     3226           
=========================================
  Files           438      438           
  Lines         20089    20089           
  Branches       2068     2068           
=========================================
+ Hits          10283    10285    +2     
+ Misses         8959     8958    -1     
+ Partials        847      846    -1     
Flag Coverage Δ
hudicli 36.87% <ø> (ø)
hudiclient ∅ <ø> (∅)
hudicommon 51.37% <ø> (+0.02%) ⬆️
hudiflink 46.38% <ø> (ø)
hudihadoopmr 33.16% <ø> (ø)
hudisparkdatasource 69.75% <ø> (ø)
hudisync 48.61% <ø> (ø)
huditimelineservice 66.49% <ø> (ø)
hudiutilities 69.46% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

see 1 file with indirect coverage changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:major degraded perf; unable to move forward; potential bugs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants