Skip to content

Commit

Permalink
add sql test
Browse files Browse the repository at this point in the history
  • Loading branch information
lurnagao-dahua authored and lurnagao-dahua committed Sep 26, 2024
1 parent 928689e commit 10140e9
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 3 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
return new Builder(this).upgradeFormatVersion(newFormatVersion).build();
}

public static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) {
protected static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) {
PartitionSpec.Builder specBuilder =
PartitionSpec.builderFor(schema).withSpecId(partitionSpec.specId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ public void testManifestLocationsInScanWithDeleteFiles() throws IOException {
}

@TestTemplate
public void testTimeTravelScanWithAlterColumn() throws Exception {
public void testTimeTravelScanWithAlterColumn() {
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
long timeTravelSnapshotId = table.currentSnapshot().snapshotId();
table.updateSchema().renameColumn("id", "re_id").commit();
TableScan scan =
table.newScan().useSnapshot(timeTravelSnapshotId).filter(Expressions.equal("id", 5));
assertThat(Iterables.size(scan.planFiles())).isEqualTo(2);
assertThat(scan.planFiles()).hasSize(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -521,4 +521,82 @@ public void testComplexTypeFilter() {
assertEquals("Should return all expected rows", ImmutableList.of(row(1)), result);
sql("DROP TABLE IF EXISTS %s", complexTypeTableName);
}

@Test
public void testTimestampAsOfWithAlterColumn() {
long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis();
long timestamp = waitUntilAfter(snapshotTs + 1000);
waitUntilAfter(timestamp + 1000);
long timestampInSeconds = TimeUnit.MILLISECONDS.toSeconds(timestamp);
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with timestamp will use snapshot schema
List<Object[]> actualWithLongFormat =
sql("SELECT * FROM %s TIMESTAMP AS OF %s WHERE id = 1", tableName, timestampInSeconds);
assertEquals("Snapshot at timestamp", expected, actualWithLongFormat);
}

@Test
public void testVersionAsOfWithAlterColumn() {
long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with snapshotId will use snapshot schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF %s WHERE id = 1", tableName, snapshotId);
assertEquals("Snapshot at specific ID", expected, actual1);
}

@Test
public void testBranchReferenceWithAlterColumn() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("test_branch", snapshotId).commit();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with branch name will use table schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF 'test_branch' WHERE re_id = 1", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual1);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// query with branch name will use table schema
List<Object[]> actual2 =
sql("SELECT * FROM %s.branch_test_branch WHERE re_id = 1", tableName);
assertEquals("Snapshot at specific branch reference name, prefix", expected, actual2);
}
}

@Test
public void testTagReferenceWithAlterColumn() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag("test_tag", snapshotId).commit();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with tag name will use snapshot schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF 'test_tag' WHERE id = 1", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual1);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// query with tag name will use snapshot schema
List<Object[]> actual2 = sql("SELECT * FROM %s.tag_test_tag WHERE id = 1", tableName);
assertEquals("Snapshot at specific tag reference name, prefix", expected, actual2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -516,4 +516,82 @@ public void testComplexTypeFilter() {
assertEquals("Should return all expected rows", ImmutableList.of(row(1)), result);
sql("DROP TABLE IF EXISTS %s", complexTypeTableName);
}

@Test
public void testTimestampAsOfWithAlterColumn() {
long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis();
long timestamp = waitUntilAfter(snapshotTs + 1000);
waitUntilAfter(timestamp + 1000);
long timestampInSeconds = TimeUnit.MILLISECONDS.toSeconds(timestamp);
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with timestamp will use snapshot schema
List<Object[]> actualWithLongFormat =
sql("SELECT * FROM %s TIMESTAMP AS OF %s WHERE id = 1", tableName, timestampInSeconds);
assertEquals("Snapshot at timestamp", expected, actualWithLongFormat);
}

@Test
public void testVersionAsOfWithAlterColumn() {
long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with snapshotId will use snapshot schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF %s WHERE id = 1", tableName, snapshotId);
assertEquals("Snapshot at specific ID", expected, actual1);
}

@Test
public void testBranchReferenceWithAlterColumn() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("test_branch", snapshotId).commit();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with branch name will use table schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF 'test_branch' WHERE re_id = 1", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual1);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// query with branch name will use table schema
List<Object[]> actual2 =
sql("SELECT * FROM %s.branch_test_branch WHERE re_id = 1", tableName);
assertEquals("Snapshot at specific branch reference name, prefix", expected, actual2);
}
}

@Test
public void testTagReferenceWithAlterColumn() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag("test_tag", snapshotId).commit();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with tag name will use snapshot schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF 'test_tag' WHERE id = 1", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual1);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// query with tag name will use snapshot schema
List<Object[]> actual2 = sql("SELECT * FROM %s.tag_test_tag WHERE id = 1", tableName);
assertEquals("Snapshot at specific tag reference name, prefix", expected, actual2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,82 @@ public void testComplexTypeFilter() {
assertEquals("Should return all expected rows", ImmutableList.of(row(1)), result);
sql("DROP TABLE IF EXISTS %s", complexTypeTableName);
}

@TestTemplate
public void testTimestampAsOfWithAlterColumn() {
long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis();
long timestamp = waitUntilAfter(snapshotTs + 1000);
waitUntilAfter(timestamp + 1000);
long timestampInSeconds = TimeUnit.MILLISECONDS.toSeconds(timestamp);
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with timestamp will use snapshot schema
List<Object[]> actualWithLongFormat =
sql("SELECT * FROM %s TIMESTAMP AS OF %s WHERE id = 1", tableName, timestampInSeconds);
assertEquals("Snapshot at timestamp", expected, actualWithLongFormat);
}

@TestTemplate
public void testVersionAsOfWithAlterColumn() {
long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with snapshotId will use snapshot schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF %s WHERE id = 1", tableName, snapshotId);
assertEquals("Snapshot at specific ID", expected, actual1);
}

@TestTemplate
public void testBranchReferenceWithAlterColumn() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("test_branch", snapshotId).commit();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with branch name will use table schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF 'test_branch' WHERE re_id = 1", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual1);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// query with branch name will use table schema
List<Object[]> actual2 =
sql("SELECT * FROM %s.branch_test_branch WHERE re_id = 1", tableName);
assertEquals("Snapshot at specific branch reference name, prefix", expected, actual2);
}
}

@TestTemplate
public void testTagReferenceWithAlterColumn() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag("test_tag", snapshotId).commit();
List<Object[]> expected = sql("SELECT * FROM %s WHERE id = 1", tableName);

sql("ALTER TABLE %s RENAME COLUMN id TO re_id", tableName);
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);

// query with tag name will use snapshot schema
List<Object[]> actual1 =
sql("SELECT * FROM %s VERSION AS OF 'test_tag' WHERE id = 1", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual1);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// query with tag name will use snapshot schema
List<Object[]> actual2 = sql("SELECT * FROM %s.tag_test_tag WHERE id = 1", tableName);
assertEquals("Snapshot at specific tag reference name, prefix", expected, actual2);
}
}
}

0 comments on commit 10140e9

Please sign in to comment.