-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-1347]fix Hbase index partition changes cause data duplication p… #2188
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2188 +/- ##
============================================
- Coverage 69.46% 9.69% -59.78%
+ Complexity 356 48 -308
============================================
Files 53 53
Lines 1929 1929
Branches 230 230
============================================
- Hits 1340 187 -1153
- Misses 456 1729 +1273
+ Partials 133 13 -120
Flags with carried forward coverage won't be shown. Click here to find out more. |
...lient/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
Outdated
Show resolved
Hide resolved
...lient/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hj2016 Left some comments, needs clarification.
@v3nkatesh please review this as well
ee843ed
to
8cb7225
Compare
@n3nash can yon review my pr? |
cc @rmpifer good candidate for you to review |
@n3nash @v3nkatesh could you please review this again? |
@n3nash could you please rebase and take care of landing this? |
cc @satishkotha @nbalajee as well, in case one of you have cycles. |
@@ -62,7 +62,7 @@ public static SparkWriteHelper newInstance() { | |||
// we cannot allow the user to change the key or partitionPath, since that will affect | |||
// everything | |||
// so pick it from one of the records. | |||
return new HoodieRecord<T>(rec1.getKey(), reducedData); | |||
return new HoodieRecord<T>(rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(), reducedData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why this is required. this is within reduceByKey and so rec1.getKey and rec2.getKey should be same right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, there are two data with the same primary key for upsert
id partitionPath updateTime
1 2018 2019-01-01
1 2019 2019-02-01
After the data is deduplicated,
Expected return: (1,2019)->(1,2019,2019-02-01)
Actual return: (1,2018)->(1,2019,2019-02-01)
In this way, the hoodile key and the data content will be inconsistent, resulting in writing to the wrong partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was resolving conflicts, it seemed that someone encountered a similar problem. #2248
@@ -480,6 +486,68 @@ private Integer getNumRegionServersAliveForTable() { | |||
@Override | |||
public boolean rollbackCommit(String instantTime) { | |||
// Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()} | |||
synchronized (SparkHoodieHBaseIndex.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove comment in line 488.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hj2016 Can you please make the following changes :
`public boolean rollbackCommit(String instantTime) {
if (config.getHBaseIndexRollbackSync()) {
//
}
return true;
`
This keeps old behavior unchanged and safe and allows you to control deletes via the config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be problems with the hbase index. The scenario that needs to be rolled back is that the hbase partition change is turned on and an error is reported after the hbase index is written for some reasons (some reasons may be due to jvm memory overflow, hbase suddenly crashes), for example, At the beginning, the data was id:1 partition:2019, and then another commit failed and the index was written to hbase. At this time, the index partition was changed to 2020. So the next time the data is written, it will only be written to In the 2020 partition, resulting in data duplication. After judging based on the rollbackSync parameter, the following logic will not be executed. If you set hbase.index.rollback.sync = false, hoodie.hbase.index.update.partition.path = true, there will still be problems. I think it would be more reasonable to write like this:
if (!config.getHbaseIndexUpdatePartitionPath()){
return true;
}
synchronized (SparkHoodieHBaseIndex.class) {
....
}
return true;
Because only when the partition changes, problems may occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@n3nash : looks like author is waiting for your response.
if (null == lastVersionResult.getRow() && rollbackSync) { | ||
Result currentVersionResult = currentVersionResults.get(i); | ||
Delete delete = new Delete(currentVersionResult.getRow()); | ||
// delete.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, currentTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why commented out code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I thought it was necessary to specify the column to delete, I later found that it is not necessary to specify the column. I think I can delete the comment code.
@@ -263,6 +265,66 @@ public void testTagLocationAndDuplicateUpdate() throws Exception { | |||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); | |||
} | |||
|
|||
@Test | |||
public void testTagLocationAndPartitionPathUpdateWithRollback() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so can you confirm that this test fails if not for the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
@@ -263,6 +265,66 @@ public void testTagLocationAndDuplicateUpdate() throws Exception { | |||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); | |||
} | |||
|
|||
@Test | |||
public void testTagLocationAndPartitionPathUpdateWithRollback() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename this to `testTagLocationAndPartitionPathUpdateWithExplicitRollback"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hj2016 Can you please resolve @nsivabalan comments and one from me please, we want to merge this before the 0.7.0 release
8cb7225
to
1573c9d
Compare
@n3nash @nsivabalan @leesf I have completed the suggested changes above. You can review if there are other problems. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hj2016 Thanks for your changes and patience. I've asked for 1 more change which is necessary to ensure we don't cause any regression, can you ptal ?
@n3nash How do you feel about my suggestion? |
@n3nash : Can you please take this across the finish line. We tried to get this into previous release itself. Don't wanna scramble again for next release. Appreciate if you can spend some time on this. |
@nsivabalan Maybe n3nash is busy and hasn't responded to me. I first modified it according to my ideas. You can confirm whether you can merge. Leave me a message if you have any questions. |
1573c9d
to
c904625
Compare
int multiGetBatchSize = config.getHbaseIndexGetBatchSize(); | ||
boolean rollbackSync = config.getHBaseIndexRollbackSync(); | ||
|
||
if (!config.getHbaseIndexUpdatePartitionPath()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getHBaseIndexRollbackSync
final String oldPartitionPath = "1970/01/01"; | ||
final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName(); | ||
HoodieWriteConfig config = getConfig(true); | ||
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me to the place where we set the new config to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have pushed some small change by myself. I will ping nishith to see if he can take a quick look and land this
d2407cb
to
e9bef5a
Compare
@n3nash : I have updated the patch w/ some minor fixes. Will land this in once CI succeeds. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #2188 +/- ##
=========================================
Coverage 51.18% 51.19%
Complexity 3226 3226
=========================================
Files 438 438
Lines 20089 20089
Branches 2068 2068
=========================================
+ Hits 10283 10285 +2
+ Misses 8959 8958 -1
+ Partials 847 846 -1
Flags with carried forward coverage won't be shown. Click here to find out more. |
…roblems
Tips
What is the purpose of the pull request
fix Hbase index partition changes cause data duplication problems
Brief change log
(for example:)
Verify this pull request
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.