Skip to content

Commit

Permalink
Review with Larry Part 4
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Oct 23, 2024
1 parent acf81a0 commit 650d70e
Showing 1 changed file with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,16 @@ public synchronized Optional<Schema> schema(final int schemaId) {
* if no snapshot is provided.
*/
@Nullable
private Snapshot getSnapshot(final IcebergReadInstructions readInstructions) {
final Snapshot snapshot;
private Snapshot getSnapshot(@NotNull final IcebergReadInstructions readInstructions) {
if (readInstructions.snapshot().isPresent()) {
snapshot = readInstructions.snapshot().get();
return readInstructions.snapshot().get();
} else if (readInstructions.tableSnapshotId().isPresent()) {
snapshot = snapshot(readInstructions.tableSnapshotId().getAsLong())
return snapshot(readInstructions.tableSnapshotId().getAsLong())
.orElseThrow(() -> new IllegalArgumentException(
"Snapshot with id " + readInstructions.tableSnapshotId().getAsLong() + " not found for " +
"table " + tableIdentifier));
} else {
snapshot = null;
}
return snapshot;
return null;
}

/**
Expand All @@ -270,10 +267,11 @@ private SpecAndSchema(
* Retrieve the schema and partition spec for the table based on the provided read instructions. Also, populate the
* read instructions with the requested snapshot, or the latest snapshot if none is requested.
*/
private SpecAndSchema getSpecAndSchema(@NotNull IcebergReadInstructions readInstructions) {
private SpecAndSchema getSpecAndSchema(@NotNull final IcebergReadInstructions readInstructions) {
final Snapshot snapshot;
final Schema schema;
final PartitionSpec partitionSpec;
final IcebergReadInstructions updatedInstructions;

final Snapshot snapshotFromInstructions = getSnapshot(readInstructions);
if (snapshotFromInstructions == null) {
Expand All @@ -286,17 +284,19 @@ private SpecAndSchema getSpecAndSchema(@NotNull IcebergReadInstructions readInst
}
if (snapshot != null) {
// Update the read instructions with the snapshot.
readInstructions = readInstructions.withSnapshot(snapshot);
updatedInstructions = readInstructions.withSnapshot(snapshot);
} else {
updatedInstructions = readInstructions;
}
} else {
// Use the schema from the snapshot
snapshot = snapshotFromInstructions;
schema = schema(snapshot.schemaId()).get();
partitionSpec = table.spec();
readInstructions = readInstructions.withSnapshot(snapshot);
updatedInstructions = readInstructions.withSnapshot(snapshot);
}

return new SpecAndSchema(schema, partitionSpec, readInstructions);
return new SpecAndSchema(schema, partitionSpec, updatedInstructions);
}

/**
Expand All @@ -314,16 +314,16 @@ public TableDefinition definition() {
* @param readInstructions The instructions for customizations while reading the table.
* @return The table definition
*/
public TableDefinition definition(@NotNull IcebergReadInstructions readInstructions) {
public TableDefinition definition(@NotNull final IcebergReadInstructions readInstructions) {
final SpecAndSchema specAndSchema = getSpecAndSchema(readInstructions);
final Schema schema = specAndSchema.schema;
final PartitionSpec partitionSpec = specAndSchema.partitionSpec;
readInstructions = specAndSchema.readInstructions;
final IcebergReadInstructions updatedInstructions = specAndSchema.readInstructions;

return fromSchema(schema,
partitionSpec,
readInstructions.tableDefinition().orElse(null),
getRenameColumnMap(table, schema, readInstructions));
updatedInstructions.tableDefinition().orElse(null),
getRenameColumnMap(table, schema, updatedInstructions));
}

/**
Expand Down Expand Up @@ -360,36 +360,36 @@ public IcebergTable table() {
* @param readInstructions The instructions for customizations while reading the table.
* @return The loaded table
*/
public IcebergTable table(@NotNull IcebergReadInstructions readInstructions) {
public IcebergTable table(@NotNull final IcebergReadInstructions readInstructions) {
final SpecAndSchema specAndSchema = getSpecAndSchema(readInstructions);
final Schema schema = specAndSchema.schema;
final PartitionSpec partitionSpec = specAndSchema.partitionSpec;
readInstructions = specAndSchema.readInstructions;
IcebergReadInstructions updatedInstructions = specAndSchema.readInstructions;

// Get the user supplied table definition.
final TableDefinition userTableDef = readInstructions.tableDefinition().orElse(null);
final TableDefinition userTableDef = updatedInstructions.tableDefinition().orElse(null);

// Map all the column names in the schema to their legalized names.
final Map<String, String> legalizedColumnRenames = getRenameColumnMap(table, schema, readInstructions);
final Map<String, String> legalizedColumnRenames = getRenameColumnMap(table, schema, updatedInstructions);

// Get the table definition from the schema (potentially limited by the user supplied table definition and
// applying column renames).
final TableDefinition tableDef = fromSchema(schema, partitionSpec, userTableDef, legalizedColumnRenames);

// Create the final instructions with the legalized column renames.
final IcebergReadInstructions finalInstructions = readInstructions.withColumnRenames(legalizedColumnRenames);
updatedInstructions = updatedInstructions.withColumnRenames(legalizedColumnRenames);

final IcebergBaseLayout keyFinder;
if (partitionSpec.isUnpartitioned()) {
// Create the flat layout location key finder
keyFinder = new IcebergFlatLayout(this, finalInstructions, dataInstructionsProviderLoader);
keyFinder = new IcebergFlatLayout(this, updatedInstructions, dataInstructionsProviderLoader);
} else {
// Create the partitioning column location key finder
keyFinder = new IcebergKeyValuePartitionedLayout(this, partitionSpec, finalInstructions,
keyFinder = new IcebergKeyValuePartitionedLayout(this, partitionSpec, updatedInstructions,
dataInstructionsProviderLoader);
}

if (finalInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.STATIC) {
if (updatedInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.STATIC) {
final IcebergTableLocationProviderBase<TableKey, IcebergTableLocationKey> locationProvider =
new IcebergStaticTableLocationProvider<>(
StandaloneTableKey.getInstance(),
Expand All @@ -408,7 +408,7 @@ public IcebergTable table(@NotNull IcebergReadInstructions readInstructions) {
final UpdateSourceRegistrar updateSourceRegistrar = ExecutionContext.getContext().getUpdateGraph();
final IcebergTableLocationProviderBase<TableKey, IcebergTableLocationKey> locationProvider;

if (finalInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.MANUAL_REFRESHING) {
if (updatedInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.MANUAL_REFRESHING) {
locationProvider = new IcebergManualRefreshTableLocationProvider<>(
StandaloneTableKey.getInstance(),
keyFinder,
Expand All @@ -421,7 +421,7 @@ public IcebergTable table(@NotNull IcebergReadInstructions readInstructions) {
keyFinder,
new IcebergTableLocationFactory(),
TableDataRefreshService.getSharedRefreshService(),
finalInstructions.updateMode().autoRefreshMs(),
updatedInstructions.updateMode().autoRefreshMs(),
this,
tableIdentifier);
}
Expand Down

0 comments on commit 650d70e

Please sign in to comment.