From edf28b0946e4d1f0340028423adf57ed5e129cbf Mon Sep 17 00:00:00 2001 From: Rui Li Date: Fri, 14 Jul 2023 14:40:16 +0800 Subject: [PATCH] Spark: Port #7980 to other spark versions --- .../extensions/TestMigrateTableProcedure.java | 22 +++++++++++++++++++ .../apache/iceberg/spark/SparkTableUtil.java | 12 +++++----- .../extensions/TestMigrateTableProcedure.java | 22 +++++++++++++++++++ .../apache/iceberg/spark/SparkTableUtil.java | 13 ++++++----- .../extensions/TestMigrateTableProcedure.java | 22 +++++++++++++++++++ .../apache/iceberg/spark/SparkTableUtil.java | 13 ++++++----- 6 files changed, 86 insertions(+), 18 deletions(-) diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 919a5131332b..d1adee74f28d 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -201,4 +201,26 @@ public void testMigratePartitionWithSpecialCharacter() throws IOException { ImmutableList.of(row(1L, "2023/05/30", java.sql.Date.valueOf("2023-05-30"))), sql("SELECT * FROM %s ORDER BY id", tableName)); } + + @Test + public void testMigrateEmptyPartitionedTable() throws Exception { + Assume.assumeTrue(catalogName.equals("spark_catalog")); + String location = temp.newFolder().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'", + tableName, location); + Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); + Assert.assertEquals(0L, result); + } + + @Test + public void testMigrateEmptyTable() throws Exception { + Assume.assumeTrue(catalogName.equals("spark_catalog")); + String location = temp.newFolder().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + tableName, location); + Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); + Assert.assertEquals(0L, result); + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index c1216f47ba82..ca19a982c9a6 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -446,12 +446,12 @@ public static void importSparkTable( } else { List sourceTablePartitions = getPartitions(spark, sourceTableIdent, partitionFilter); - Preconditions.checkArgument( - !sourceTablePartitions.isEmpty(), - "Cannot find any partitions in table %s", - sourceTableIdent); - importSparkPartitions( - spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles); + if (sourceTablePartitions.isEmpty()) { + targetTable.newAppend().commit(); + } else { + importSparkPartitions( + spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles); + } } } catch (AnalysisException e) { throw SparkExceptionUtil.toUncheckedException( diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 919a5131332b..d1adee74f28d 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -201,4 +201,26 @@ public void testMigratePartitionWithSpecialCharacter() throws IOException { ImmutableList.of(row(1L, "2023/05/30", java.sql.Date.valueOf("2023-05-30"))), sql("SELECT * FROM %s ORDER BY id", tableName)); } + + @Test + public void testMigrateEmptyPartitionedTable() throws Exception { + Assume.assumeTrue(catalogName.equals("spark_catalog")); + String location = temp.newFolder().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'", + tableName, location); + Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); + Assert.assertEquals(0L, result); + } + + @Test + public void testMigrateEmptyTable() throws Exception { + Assume.assumeTrue(catalogName.equals("spark_catalog")); + String location = temp.newFolder().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + tableName, location); + Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); + Assert.assertEquals(0L, result); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 4bfff5b2c44a..6727b820e4c0 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -440,12 +440,13 @@ public static void importSparkTable( } else { List sourceTablePartitions = getPartitions(spark, sourceTableIdent, partitionFilter); - Preconditions.checkArgument( - !sourceTablePartitions.isEmpty(), - "Cannot find any partitions in table %s", - sourceTableIdent); - importSparkPartitions( - spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles); + if (sourceTablePartitions.isEmpty()) { + targetTable.newAppend().commit(); + } + else { + importSparkPartitions( + spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles); + } } } catch (AnalysisException e) { throw SparkExceptionUtil.toUncheckedException( diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 49fee09408c5..b8b5df099089 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -201,4 +201,26 @@ public void testMigratePartitionWithSpecialCharacter() throws IOException { ImmutableList.of(row(1L, "2023/05/30", java.sql.Date.valueOf("2023-05-30"))), sql("SELECT * FROM %s ORDER BY id", tableName)); } + + @Test + public void testMigrateEmptyPartitionedTable() throws Exception { + Assume.assumeTrue(catalogName.equals("spark_catalog")); + String location = temp.newFolder().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'", + tableName, location); + Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); + Assert.assertEquals(0L, result); + } + + @Test + public void testMigrateEmptyTable() throws Exception { + Assume.assumeTrue(catalogName.equals("spark_catalog")); + String location = temp.newFolder().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + tableName, location); + Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); + Assert.assertEquals(0L, result); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 03255e6a7dd1..d4a2a28f4cdd 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -441,12 +441,13 @@ public static void importSparkTable( } else { List sourceTablePartitions = getPartitions(spark, sourceTableIdent, partitionFilter); - Preconditions.checkArgument( - !sourceTablePartitions.isEmpty(), - "Cannot find any partitions in table %s", - sourceTableIdent); - importSparkPartitions( - spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles); + if (sourceTablePartitions.isEmpty()) { + targetTable.newAppend().commit(); + } + else { + importSparkPartitions( + spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles); + } } } catch (AnalysisException e) { throw SparkExceptionUtil.toUncheckedException(