Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1722]hive beeline/spark-sql query specified field on mor table occur NPE #2722

Merged
merged 1 commit into from
May 12, 2021

Conversation

xiarixiaoyao
Copy link
Contributor

@xiarixiaoyao xiarixiaoyao commented Mar 25, 2021

Tips

What is the purpose of the pull request

Fix the bug introduced by HUDI-892 HUDI-892 introduce this problem。
this pr skip adding projection columns if there are no log files in the hoodieRealtimeSplit。 but this pr donnot consider that multiple getRecordReaders share same jobConf。
Consider the following questions:
we have four getRecordReaders:

reader1(its hoodieRealtimeSplit contains no log files)
reader2 (its hoodieRealtimeSplit contains log files)
reader3(its hoodieRealtimeSplit contains log files)
reader4(its hoodieRealtimeSplit contains no log files)

now reader1 run first, HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in jobConf will be set to be true, and no hoodie additional projection columns will be added to jobConf (see HoodieParquetRealtimeInputFormat.addProjectionToJobConf)

reader2 run later, since HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP in jobConf is set to be true, no hoodie additional projection columns will be added to jobConf. (see HoodieParquetRealtimeInputFormat.addProjectionToJobConf)
which lead to the result that _hoodie_record_key would be missing and merge step would throw exceptions

2021-03-25 20:23:14,014 | INFO  | AsyncDispatcher event handler | Diagnostics report from attempt_1615883368881_0038_m_000000_0: Error: java.lang.NullPointerException2021-03-25 20:23:14,014 | INFO  | AsyncDispatcher event handler | Diagnostics report from attempt_1615883368881_0038_m_000000_0: Error: java.lang.NullPointerException at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:101) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:36) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:92) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79) at org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:68) at org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:77) at org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:42) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:205) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:191) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52) at org.apache.hadoop.hive.ql.exec.mr.ExecMapRunner.run(ExecMapRunner.java:37) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349) at org.apache.hadoop.mapred.YarnChild$1.run(YarnChild.java:183) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:177)

Obviously, this is an occasional problem。 if reader2 run first, hoodie additional projection columns will be added to jobConf and in this case the query will be ok

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

This pull request is already covered by existing tests

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@xiarixiaoyao
Copy link
Contributor Author

xiarixiaoyao commented Mar 25, 2021

test step:
before patch:
step1:

val df = spark.range(0, 100000).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))

// create hoodie table hive_14b

merge(df, 4, "default", "hive_14b", DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")

notice: bulk_insert will produce 4 files in hoodie table

step2:

val df = spark.range(99999, 100002).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))

// upsert table

merge(df, 4, "default", "hive_14b", DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")

now : we have four base files and one log file in hoodie table

step3:

spark-sql/beeline:

select count(col3) from hive_14b_rt;

then the query failed.
2021-03-25 20:23:14,014 | INFO | AsyncDispatcher event handler | Diagnostics report from attempt_1615883368881_0038_m_000000_0: Error: java.lang.NullPointerException2021-03-25 20:23:14,014 | INFO | AsyncDispatcher event handler | Diagnostics report from attempt_1615883368881_0038_m_000000_0: Error: java.lang.NullPointerException at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:101) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:36) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:92) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:43) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.next(HoodieRealtimeRecordReader.java:79) at org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:68) at org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:77) at org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader.next(HoodieCombineRealtimeRecordReader.java:42) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:205) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:191) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52) at org.apache.hadoop.hive.ql.exec.mr.ExecMapRunner.run(ExecMapRunner.java:37) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349) at org.apache.hadoop.mapred.YarnChild$1.run(YarnChild.java:183) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:177)

after patch:
spark-sql/hive-beeline
select count(col3) from hive_14b_rt;
+---------+
| _c0 |
+---------+
| 100002 |
+---------+

merge function:
def merge(df: org.apache.spark.sql.DataFrame, par: Int, db: String, tableName: String,
tableType: String = DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
hivePartitionExtract: String = "org.apache.hudi.hive.MultiPartKeysValueExtractor", op: String = "upsert"): Unit = {
val mode = if (op.equals("bulk_insert")) {
Overwrite
} else {
Append
}
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, op).
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[ComplexKeyGenerator].getName).
option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
option("hoodie.metadata.enable", "false").
option("hoodie.insert.shuffle.parallelism", par.toString).
option("hoodie.upsert.shuffle.parallelism", par.toString).
option("hoodie.delete.shuffle.parallelism", par.toString).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
option(HIVE_USE_JDBC_OPT_KEY, "false").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
option(TABLE_NAME, tableName).mode(mode).save(s"/tmp/db/{tableName}")
}

@xiarixiaoyao
Copy link
Contributor Author

@garyli1019 could you pls help me review this pr, thanks

@nsivabalan nsivabalan requested a review from garyli1019 March 30, 2021 19:36
@nsivabalan nsivabalan added the priority:critical production down; pipelines stalled; Need help asap. label Mar 30, 2021
@nsivabalan
Copy link
Contributor

@garyli1019 : replace "sev:critical" label with "sev:high" if applicable. Fix the corresponding jira as well if you had to change it.

@garyli1019
Copy link
Member

@xiarixiaoyao Thanks for your contribution. Looks like you are able to reproduce this problem in the unit test. Is that possible to add the unit test to this pr as well?

@garyli1019 garyli1019 self-assigned this Apr 1, 2021
@xiarixiaoyao
Copy link
Contributor Author

thanks @garyli1019 . ok, i will try to add unit test

@xiarixiaoyao xiarixiaoyao force-pushed the hive_npe branch 2 times, most recently from 6070c23 to 0a896f1 Compare April 9, 2021 13:12
@xiarixiaoyao
Copy link
Contributor Author

@garyli1019 unit test has added, pls review again, thanks

Copy link
Member

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao thanks for adding the test! @lw309637554 would you please review this PR as well?

@garyli1019 garyli1019 requested a review from lw309637554 April 10, 2021 14:00
@lw309637554
Copy link
Contributor

@xiarixiaoyao hello , will review it this weekend

InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
assertEquals(1, splits.length);
RecordReader<NullWritable, ArrayWritable> recordReader =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hello , just see one recordreader?

Copy link
Contributor Author

@xiarixiaoyao xiarixiaoyao Apr 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we only create one combine recorder, but this recorder hold three RealtimeCompactedRecordReaders。
the creating order of those RealtimeCompactedRecordReaders lead this npe problem.
for test example:
combine recorder holds three RealtimeCompactedRecordReaders, we call them creader1, creader2, creader3
creader1: only has base file
creader2: only has base file
creader3: has base file and log file.

if creader3 is create firstly, hoodie additional projection columns will be added to jobConf and in this case the query will be ok
however if creader1 or creader2 is create firstly, no hoodie additional projection columns will be added to jobConf, the query will failed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks

@lw309637554
Copy link
Contributor

@xiarixiaoyao thanks for your contribution. Add the unit test is very necessary. Also the resolution left some comments.

@xiarixiaoyao
Copy link
Contributor Author

xiarixiaoyao commented Apr 19, 2021

@lw309637554 thanks for you review. i have answered your questions, pls check them, thanks。

Another question: TestHoodieCombineHiveInputFormat.testHoodieRealtimeCombineHoodieInputFormat is disabled by default, i have checked that test function, and find there exists some problems. could i fix those problem and enable TestHoodieCombineHiveInputFormat.testHoodieRealtimeCombineHoodieInputFormat default

@vinothchandar
Copy link
Member

@nsivabalan same over to you to get this ready for review.

@lw309637554
Copy link
Contributor

testHoodieRealtimeCombineHoodieInputFormat

try it .

@nsivabalan
Copy link
Contributor

@vinothchandar : I see that author is actively responding/working on the PR. Will leave it to the author to address feedback. If we don't see any activity for sometime, I can chime in.

@xiarixiaoyao
Copy link
Contributor Author

@lw309637554 @nsivabalan thanks for your review. i will try testHoodieRealtimeCombineHoodieInputFormat in another pr, since
it has nothing to do with this problem.

Copy link
Member

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lw309637554 can you please let us know if you are okay with this change. This LGTM

synchronized (jobConf) {
LOG.info(
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null
|| (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pull this check into a small util method, that we can call in both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, fixed

@vinothchandar
Copy link
Member

@xiarixiaoyao Could you please rebase this PR . I tried doing this myself, seems tricky

@xiarixiaoyao
Copy link
Contributor Author

@vinothchandar i will rebase this pr, thanks

@codecov-commenter
Copy link

codecov-commenter commented May 11, 2021

Codecov Report

Merging #2722 (8632d15) into master (ac72470) will increase coverage by 0.00%.
The diff coverage is 25.00%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #2722   +/-   ##
=========================================
  Coverage     54.78%   54.78%           
- Complexity     3808     3812    +4     
=========================================
  Files           481      481           
  Lines         23320    23326    +6     
  Branches       2488     2492    +4     
=========================================
+ Hits          12775    12779    +4     
- Misses         9390     9392    +2     
  Partials       1155     1155           
Flag Coverage Δ Complexity Δ
hudicli 39.53% <ø> (ø) 220.00 <ø> (ø)
hudiclient ∅ <ø> (∅) 0.00 <ø> (ø)
hudicommon 50.37% <ø> (ø) 1975.00 <ø> (ø)
hudiflink 63.04% <ø> (ø) 530.00 <ø> (ø)
hudihadoopmr 51.01% <25.00%> (+0.05%) 266.00 <4.00> (+4.00)
hudisparkdatasource 73.33% <ø> (ø) 237.00 <ø> (ø)
hudisync 46.44% <ø> (ø) 144.00 <ø> (ø)
huditimelineservice 64.36% <ø> (ø) 62.00 <ø> (ø)
hudiutilities 69.59% <ø> (ø) 378.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...i/hadoop/utils/HoodieRealtimeInputFormatUtils.java 33.91% <16.66%> (-0.95%) 12.00 <2.00> (+2.00) ⬇️
...oop/realtime/HoodieParquetRealtimeInputFormat.java 81.48% <50.00%> (ø) 7.00 <2.00> (ø)
...hudi/hadoop/hive/HoodieCombineHiveInputFormat.java 44.12% <0.00%> (+0.23%) 17.00% <0.00%> (+1.00%)
...op/realtime/HoodieCombineRealtimeRecordReader.java 83.33% <0.00%> (+6.66%) 9.00% <0.00%> (+1.00%)

@xiarixiaoyao
Copy link
Contributor Author

@vinothchandar , i have rebased this pr pls check them, thanks

@lw309637554
Copy link
Contributor

@lw309637554 can you please let us know if you are okay with this change. This LGTM

@vinothchandar @xiarixiaoyao LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants