Skip to content

Commit

Permalink
Added python support and updated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Oct 23, 2024
1 parent 650d70e commit 315ea18
Show file tree
Hide file tree
Showing 6 changed files with 556 additions and 553 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ public IcebergUpdateMode updateMode() {
* {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is
* provided, the latest snapshot will be loaded.
*/
public abstract OptionalLong tableSnapshotId();
public abstract OptionalLong snapshotId();

/**
* The snapshot to load for reading. If both this and {@link #tableSnapshotId()} are provided, the
* {@link Snapshot#snapshotId()} should match the {@link #tableSnapshotId()}. Otherwise, only one of them should be
* Return a copy of this instructions object with the snapshot ID replaced by {@code value}.
*/
public abstract IcebergReadInstructions withSnapshotId(long value);

/**
* The snapshot to load for reading. If both this and {@link #snapshotId()} are provided, the
* {@link Snapshot#snapshotId()} should match the {@link #snapshotId()}. Otherwise, only one of them should be
* provided. If neither is provided, the latest snapshot will be loaded.
*/
public abstract Optional<Snapshot> snapshot();
Expand All @@ -92,7 +97,7 @@ public interface Builder {

Builder updateMode(IcebergUpdateMode updateMode);

Builder tableSnapshotId(long tableSnapshotId);
Builder snapshotId(long snapshotId);

Builder snapshot(Snapshot snapshot);

Expand All @@ -101,10 +106,10 @@ public interface Builder {

@Value.Check
final void checkSnapshotId() {
if (tableSnapshotId().isPresent() && snapshot().isPresent() &&
tableSnapshotId().getAsLong() != snapshot().get().snapshotId()) {
throw new IllegalArgumentException("If both tableSnapshotId and snapshot are provided, the snapshotId " +
"must match");
if (snapshotId().isPresent() && snapshot().isPresent() &&
snapshotId().getAsLong() != snapshot().get().snapshotId()) {
throw new IllegalArgumentException("If both snapshotID and snapshot are provided, the snapshot Ids " +
"must match, found " + snapshotId().getAsLong() + " and " + snapshot().get().snapshotId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ public synchronized Optional<Schema> schema(final int schemaId) {
private Snapshot getSnapshot(@NotNull final IcebergReadInstructions readInstructions) {
if (readInstructions.snapshot().isPresent()) {
return readInstructions.snapshot().get();
} else if (readInstructions.tableSnapshotId().isPresent()) {
return snapshot(readInstructions.tableSnapshotId().getAsLong())
} else if (readInstructions.snapshotId().isPresent()) {
return snapshot(readInstructions.snapshotId().getAsLong())
.orElseThrow(() -> new IllegalArgumentException(
"Snapshot with id " + readInstructions.tableSnapshotId().getAsLong() + " not found for " +
"Snapshot with id " + readInstructions.snapshotId().getAsLong() + " not found for " +
"table " + tableIdentifier));
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ void cities1() {
{
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(CITIES_ID);
final TableDefinition td = tableAdapter.definition(IcebergReadInstructions.builder()
.tableSnapshotId(SNAPSHOT_1_ID)
.snapshotId(SNAPSHOT_1_ID)
.build());
assertThat(td).isEqualTo(CITIES_1_TD);

cities1 = tableAdapter.table(IcebergReadInstructions.builder()
.tableSnapshotId(SNAPSHOT_1_ID)
.snapshotId(SNAPSHOT_1_ID)
.build());
assertThat(cities1.getDefinition()).isEqualTo(CITIES_1_TD);
}
Expand All @@ -107,12 +107,12 @@ void cities2() {
{
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(CITIES_ID);
final TableDefinition td = tableAdapter.definition(IcebergReadInstructions.builder()
.tableSnapshotId(SNAPSHOT_2_ID)
.snapshotId(SNAPSHOT_2_ID)
.build());
assertThat(td).isEqualTo(CITIES_2_TD);

cities2 = tableAdapter.table(IcebergReadInstructions.builder()
.tableSnapshotId(SNAPSHOT_2_ID)
.snapshotId(SNAPSHOT_2_ID)
.build());
assertThat(cities2.getDefinition()).isEqualTo(CITIES_2_TD);
}
Expand Down
35 changes: 12 additions & 23 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
_JIcebergTableAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergTableAdapter")
_JIcebergTable = jpy.get_type("io.deephaven.iceberg.util.IcebergTable")
_JIcebergTools = jpy.get_type("io.deephaven.iceberg.util.IcebergTools")
_JIcebergDefinitionTable = jpy.get_type("io.deephaven.iceberg.util.IcebergDefinitionTable")
_JIcebergReadTable = jpy.get_type("io.deephaven.iceberg.util.IcebergReadTable")

# IcebergToolsS3 is an optional library
try:
Expand Down Expand Up @@ -93,7 +91,8 @@ def __init__(self,
table_definition: Optional[TableDefinitionLike] = None,
data_instructions: Optional[s3.S3Instructions] = None,
column_renames: Optional[Dict[str, str]] = None,
update_mode: Optional[IcebergUpdateMode] = None):
update_mode: Optional[IcebergUpdateMode] = None,
snapshot_id: Optional[int] = None):
"""
Initializes the instructions using the provided parameters.
Expand All @@ -107,6 +106,7 @@ def __init__(self,
the output table.
update_mode (Optional[IcebergUpdateMode]): The update mode for the table. If omitted, the default update
mode of :py:func:`IcebergUpdateMode.static() <IcebergUpdateMode.static>` is used.
snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected.
Raises:
DHError: If unable to build the instructions object.
Expand All @@ -128,6 +128,9 @@ def __init__(self,
if update_mode:
builder.updateMode(update_mode.j_object)

if snapshot_id:
builder.snapshotId(snapshot_id)

self._j_object = builder.build()
except Exception as e:
raise DHError(e, "Failed to build Iceberg instructions") from e
Expand Down Expand Up @@ -201,32 +204,24 @@ def snapshots(self) -> Table:
"""
return Table(self.j_object.snapshots())

def definition(self, instructions: Optional[IcebergReadInstructions] = None, snapshot_id: Optional[int] = None) -> Table:
def definition(self, instructions: Optional[IcebergReadInstructions] = None) -> Table:
"""
Returns the Deephaven table definition as a Deephaven table.
Args:
instructions (Optional[IcebergReadInstructions]): the instructions for reading the table. These instructions
can include column renames, table definition, and specific data instructions for reading the data files
from the provider. If omitted, the table will be read with default instructions.
snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected.
Returns:
a table containing the table definition.
"""

builder = _JIcebergDefinitionTable.builder()

if snapshot_id is not None:
builder.tableSnapshotId(snapshot_id)

if instructions is not None:
builder.instructions(instructions.j_object)
return Table(self.j_object.definitionTable(instructions.j_object))
return Table(self.j_object.definitionTable())

return Table(self.j_object.definitionTable(builder.build()))


def table(self, instructions: Optional[IcebergReadInstructions] = None, snapshot_id: Optional[int] = None) -> IcebergTable:
def table(self, instructions: Optional[IcebergReadInstructions] = None) -> IcebergTable:
"""
Reads the table using the provided instructions. Optionally, a snapshot id can be provided to read a specific
snapshot of the table.
Expand All @@ -236,21 +231,15 @@ def table(self, instructions: Optional[IcebergReadInstructions] = None, snapshot
can include column renames, table definition, and specific data instructions for reading the data files
from the provider. If omitted, the table will be read in `static()` mode without column renames or data
instructions.
snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected.
Returns:
Table: the table read from the catalog.
"""

builder = _JIcebergReadTable.builder()

if snapshot_id is not None:
builder.tableSnapshotId(snapshot_id)

if instructions is not None:
builder.instructions(instructions.j_object)
return IcebergTable(self.j_object.table(instructions.j_object))
return IcebergTable(self.j_object.table())

return IcebergTable(self.j_object.table(builder.build()))

@property
def j_object(self) -> jpy.JType:
Expand Down
4 changes: 4 additions & 0 deletions py/server/tests/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ def test_instruction_create_with_table_definition_list(self):
self.assertTrue(col_names[1] == "x")
self.assertTrue(col_names[2] == "y")
self.assertTrue(col_names[3] == "z")

def test_instruction_create_with_snapshot_id(self):
iceberg_read_instructions = iceberg.IcebergReadInstructions(snapshot_id=12345)
self.assertTrue(iceberg_read_instructions.j_object.snapshotId().get() == 12345)

0 comments on commit 315ea18

Please sign in to comment.