Skip to content

Commit

Permalink
Spark: Port apache#7980 to other spark versions
Browse files Browse the repository at this point in the history
  • Loading branch information
Rui Li committed Jul 14, 2023
1 parent 31dd70c commit edf28b0
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,26 @@ public void testMigratePartitionWithSpecialCharacter() throws IOException {
ImmutableList.of(row(1L, "2023/05/30", java.sql.Date.valueOf("2023-05-30"))),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@Test
public void testMigrateEmptyPartitionedTable() throws Exception {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'",
tableName, location);
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
Assert.assertEquals(0L, result);
}

@Test
public void testMigrateEmptyTable() throws Exception {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
Assert.assertEquals(0L, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,12 @@ public static void importSparkTable(
} else {
List<SparkPartition> sourceTablePartitions =
getPartitions(spark, sourceTableIdent, partitionFilter);
Preconditions.checkArgument(
!sourceTablePartitions.isEmpty(),
"Cannot find any partitions in table %s",
sourceTableIdent);
importSparkPartitions(
spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
if (sourceTablePartitions.isEmpty()) {
targetTable.newAppend().commit();
} else {
importSparkPartitions(
spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
}
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,26 @@ public void testMigratePartitionWithSpecialCharacter() throws IOException {
ImmutableList.of(row(1L, "2023/05/30", java.sql.Date.valueOf("2023-05-30"))),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@Test
public void testMigrateEmptyPartitionedTable() throws Exception {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'",
tableName, location);
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
Assert.assertEquals(0L, result);
}

@Test
public void testMigrateEmptyTable() throws Exception {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
Assert.assertEquals(0L, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,13 @@ public static void importSparkTable(
} else {
List<SparkPartition> sourceTablePartitions =
getPartitions(spark, sourceTableIdent, partitionFilter);
Preconditions.checkArgument(
!sourceTablePartitions.isEmpty(),
"Cannot find any partitions in table %s",
sourceTableIdent);
importSparkPartitions(
spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
if (sourceTablePartitions.isEmpty()) {
targetTable.newAppend().commit();
}
else {
importSparkPartitions(
spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
}
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,26 @@ public void testMigratePartitionWithSpecialCharacter() throws IOException {
ImmutableList.of(row(1L, "2023/05/30", java.sql.Date.valueOf("2023-05-30"))),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@Test
public void testMigrateEmptyPartitionedTable() throws Exception {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet PARTITIONED BY (id) LOCATION '%s'",
tableName, location);
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
Assert.assertEquals(0L, result);
}

@Test
public void testMigrateEmptyTable() throws Exception {
Assume.assumeTrue(catalogName.equals("spark_catalog"));
String location = temp.newFolder().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
Assert.assertEquals(0L, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,13 @@ public static void importSparkTable(
} else {
List<SparkPartition> sourceTablePartitions =
getPartitions(spark, sourceTableIdent, partitionFilter);
Preconditions.checkArgument(
!sourceTablePartitions.isEmpty(),
"Cannot find any partitions in table %s",
sourceTableIdent);
importSparkPartitions(
spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
if (sourceTablePartitions.isEmpty()) {
targetTable.newAppend().commit();
}
else {
importSparkPartitions(
spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
}
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(
Expand Down

0 comments on commit edf28b0

Please sign in to comment.