Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite_position_delete_files leads to error #8045

Closed
mcgray opened this issue Jul 11, 2023 · 7 comments · Fixed by #8059
Closed

rewrite_position_delete_files leads to error #8045

mcgray opened this issue Jul 11, 2023 · 7 comments · Fixed by #8059
Milestone

Comments

@mcgray
Copy link

mcgray commented Jul 11, 2023

Apache Iceberg version

1.3.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

While testing https://iceberg.apache.org/docs/latest/spark-procedures/#rewrite_position_delete_files I see the following error:

23/07/10 21:07:26 WARN RewritePositionDeleteFilesSparkAction: Failure during rewrite process for group FileGroupInfo{globalIndex=2, partitionIndex=1, partition=org.apache.iceberg.util.StructProjection@4fdf7e57}
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(partition.ts_day = 19498)" due to data type mismatch: the left and right operands of the binary operator have incompatible types ("DATE" and "INT").;
'Filter ((partition#703.et = FIND.PRODUCT) AND (partition#703.ts_day = 19498))
+- RelationV2[content#699, file_path#700, file_format#701, spec_id#702, partition#703, record_count#704L, file_size_in_bytes#705L, column_sizes#706, value_counts#707, null_value_counts#708, nan_value_counts#709, lower_bounds#710, upper_bounds#711, key_metadata#712, split_offsets#713, equality_ids#714, sort_order_id#715, readable_metrics#716]  internal_catalog.fanflow2iceberg.fanflow_event.data_files
	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:269)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4(CheckAnalysis.scala:256)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4$adapted(CheckAnalysis.scala:256)
	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1(CheckAnalysis.scala:256)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1$adapted(CheckAnalysis.scala:163)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:163)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:160)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:188)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:156)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:146)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:211)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)

Partition specification:

"spec-id" : 2,
    "fields" : [ {
      "name" : "et",
      "transform" : "identity",
      "source-id" : 3,
      "field-id" : 1000
    }, {
      "name" : "ts_day",
      "transform" : "day",
      "source-id" : 2,
      "field-id" : 1001

Source fields:

      "id" : 2,
      "name" : "ts",
      "required" : false,
      "type" : "timestamptz"
    }, {
      "id" : 3,
      "name" : "et",
      "required" : false,
      "type" : "string"
    }, {
@dramaticlly
Copy link
Contributor

Looks like this was caused by partition transformation/hidden partition where column ts is of type timestamptz but transformed partition spec have days transformation on ts column which result in ts_day of type int.

If you can, can you share how your original rewrite_position_delete_files command so we can try to repro?

@szehon-ho
Copy link
Collaborator

I think I know the issue.

It is part of the code to do 'removeDanglingDeletes'. For each partition of delete files, I am trying to find 'live' data files so I can do the clean up.

In this method, https://github.com/apache/iceberg/blob/master/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java#L122 , I use the DeleteFile's partition data directly , to query data_files table. I thought it would work as the data_files table is using the transformed partition values, just as the DeleteFile partition data should have.

But the partition data of DeleteFile is not the same type as exposed in the Spark metadata table... In particular, there's a difference of logical and real Avro types as defined in spec: https://iceberg.apache.org/spec/#avro

Summary: the issue does not affect specifically partition transforms. It affects partitions that has an Avro type != logical Avro type . ie date, time, etc.

Im investigating a fix involving adding a conversion from Avro type to logical type.

@Fokko Fokko added this to the Iceberg 1.3.1 milestone Jul 12, 2023
@mcgray
Copy link
Author

mcgray commented Jul 12, 2023

No argument for rewrite_position_delete_files apart from table was used.

@szehon-ho
Copy link
Collaborator

I have a fix here: #8059

@atifiu
Copy link

atifiu commented Oct 13, 2023

@szehon-ho Thanks for the fix. I am facing the same issue on iceberg 1.3.0 while trying to remove delete files using proc rewrite_position_delete_files . Reason why I have remove delete files is the fact that aggregate pushdown is failing with this error message SparkScanBuilder: Skipping aggregate pushdown: detected row level deletes. #6252 (comment)
And I am still not sure how delete files were created when I have defined Merge on Read for dml operations.
#6252 (comment)

So my questions to you is how can we remove delete files if we are still using 1.3.0 ? Is it somehow possible to manually remove reference of delete files without corrupting the metadata ? Thanks for your help.

23/10/13 00:16:56 ERROR RewritePositionDeleteFilesSparkAction: Failure during rewrite group FileGroupInfo{globalIndex=1, partitionIndex=1, partition=org.apache.iceberg.util.StructProjection@3162902b}
org.apache.spark.sql.AnalysisException: cannot resolve '(partition.`page_view_dtm_day` = 18384)' due to data type mismatch: differing types in '(partition.`page_view_dtm_day` = 18384)' (date and int).;
'Filter (partition#4925.page_view_dtm_day = 18384)
+- RelationV2[content#4921, file_path#4922, file_format#4923, spec_id#4924, partition#4925, record_count#4926L, file_size_in_bytes#4927L, column_sizes#4928, value_counts#4929, null_value_counts#4930, nan_value_counts#4931, lower_bounds#4932, upper_bounds#4933, key_metadata#4934, split_offsets#4935, equality_ids#4936, sort_order_id#4937, readable_metrics#4938]

@atifiu
Copy link

atifiu commented Oct 15, 2023

And we are still using spark 3.3.1 so is there any way to get around this issue without upgrading to spark 3.4 and iceberg 1.3.1.

@Fokko
Copy link
Contributor

Fokko commented Oct 15, 2023

@atifiu The patch has been backported to Spark 3.3 as well: #8069 so you only need to upgrade your Iceberg dependency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants