Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add catalogVersion into QueryInputMetadata & QueryOutputMetadata #17497

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ private static QueryIOMetadata getQueryIOMetadata(QueryInfo queryInfo)

inputs.add(new QueryInputMetadata(
input.getCatalogName(),
input.getCatalogVersion(),
input.getSchema(),
input.getTable(),
input.getColumns().stream()
Expand Down Expand Up @@ -462,6 +463,7 @@ private static QueryIOMetadata getQueryIOMetadata(QueryInfo queryInfo)
output = Optional.of(
new QueryOutputMetadata(
queryInfo.getOutput().get().getCatalogName(),
queryInfo.getOutput().get().getCatalogVersion(),
queryInfo.getOutput().get().getSchema(),
queryInfo.getOutput().get().getTable(),
outputColumnsMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ else if (element instanceof LikeClause likeClause) {
}
outputConsumer.accept(new Output(
catalogName,
catalogHandle.getVersion(),
tableName.getSchemaName(),
tableName.getObjectName(),
Optional.of(tableMetadata.getColumns().stream()
Expand Down
36 changes: 20 additions & 16 deletions core/trino-main/src/main/java/io/trino/execution/Input.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.CatalogHandle.CatalogVersion;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;

Expand All @@ -32,6 +33,7 @@
public final class Input
{
private final String catalogName;
private final CatalogVersion catalogVersion;
private final String schema;
private final String table;
private final List<Column> columns;
Expand All @@ -42,28 +44,22 @@ public final class Input
@JsonCreator
public Input(
@JsonProperty("catalogName") String catalogName,
@JsonProperty("catalogVersion") CatalogVersion catalogVersion,
@JsonProperty("schema") String schema,
@JsonProperty("table") String table,
@JsonProperty("connectorInfo") Optional<Object> connectorInfo,
@JsonProperty("columns") List<Column> columns,
@JsonProperty("fragmentId") PlanFragmentId fragmentId,
@JsonProperty("planNodeId") PlanNodeId planNodeId)
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(schema, "schema is null");
requireNonNull(table, "table is null");
requireNonNull(connectorInfo, "connectorInfo is null");
requireNonNull(columns, "columns is null");
requireNonNull(fragmentId, "fragmentId is null");
requireNonNull(planNodeId, "planNodeId is null");

this.catalogName = catalogName;
this.schema = schema;
this.table = table;
this.connectorInfo = connectorInfo;
this.columns = ImmutableList.copyOf(columns);
this.fragmentId = fragmentId;
this.planNodeId = planNodeId;
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.catalogVersion = requireNonNull(catalogVersion, "catalogVersion is null");
this.schema = requireNonNull(schema, "schema is null");
this.table = requireNonNull(table, "table is null");
this.connectorInfo = requireNonNull(connectorInfo, "connectorInfo is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.fragmentId = requireNonNull(fragmentId, "fragmentId is null");
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
}

@JsonProperty
Expand All @@ -72,6 +68,12 @@ public String getCatalogName()
return catalogName;
}

@JsonProperty
public CatalogVersion getCatalogVersion()
{
return catalogVersion;
}

@JsonProperty
public String getSchema()
{
Expand Down Expand Up @@ -119,6 +121,7 @@ public boolean equals(Object o)
}
Input input = (Input) o;
return Objects.equals(catalogName, input.catalogName) &&
Objects.equals(catalogVersion, input.catalogVersion) &&
Objects.equals(schema, input.schema) &&
Objects.equals(table, input.table) &&
Objects.equals(columns, input.columns) &&
Expand All @@ -130,14 +133,15 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(catalogName, schema, table, columns, connectorInfo, fragmentId, planNodeId);
return Objects.hash(catalogName, catalogVersion, schema, table, columns, connectorInfo, fragmentId, planNodeId);
}

@Override
public String toString()
{
return toStringHelper(this)
.addValue(catalogName)
.addValue(catalogVersion)
.addValue(schema)
.addValue(table)
.addValue(columns)
Expand Down
16 changes: 12 additions & 4 deletions core/trino-main/src/main/java/io/trino/sql/analyzer/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.security.SecurityContext;
import io.trino.spi.QueryId;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.CatalogHandle.CatalogVersion;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnSchema;
import io.trino.spi.connector.ConnectorTableMetadata;
Expand Down Expand Up @@ -265,7 +266,7 @@ public Optional<Output> getTarget()
{
return target.map(target -> {
QualifiedObjectName name = target.getName();
return new Output(name.getCatalogName(), name.getSchemaName(), name.getObjectName(), target.getColumns());
return new Output(name.getCatalogName(), target.getCatalogVersion(), name.getSchemaName(), name.getObjectName(), target.getColumns());
});
}

Expand All @@ -276,9 +277,9 @@ public void setUpdateType(String updateType)
}
}

public void setUpdateTarget(QualifiedObjectName targetName, Optional<Table> targetTable, Optional<List<OutputColumn>> targetColumns)
public void setUpdateTarget(CatalogVersion catalogVersion, QualifiedObjectName targetName, Optional<Table> targetTable, Optional<List<OutputColumn>> targetColumns)
{
this.target = Optional.of(new UpdateTarget(targetName, targetTable, targetColumns));
this.target = Optional.of(new UpdateTarget(catalogVersion, targetName, targetTable, targetColumns));
}

public boolean isUpdateTarget(Table table)
Expand Down Expand Up @@ -2038,17 +2039,24 @@ public String getAuthorization()

private static class UpdateTarget
{
private final CatalogVersion catalogVersion;
private final QualifiedObjectName name;
private final Optional<Table> table;
private final Optional<List<OutputColumn>> columns;

public UpdateTarget(QualifiedObjectName name, Optional<Table> table, Optional<List<OutputColumn>> columns)
public UpdateTarget(CatalogVersion catalogVersion, QualifiedObjectName name, Optional<Table> table, Optional<List<OutputColumn>> columns)
{
this.catalogVersion = requireNonNull(catalogVersion, "catalogVersion is null");
this.name = requireNonNull(name, "name is null");
this.table = requireNonNull(table, "table is null");
this.columns = columns.map(ImmutableList::copyOf);
}

private CatalogVersion getCatalogVersion()
{
return catalogVersion;
}

public QualifiedObjectName getName()
{
return name;
Expand Down
13 changes: 12 additions & 1 deletion core/trino-main/src/main/java/io/trino/sql/analyzer/Output.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.CatalogHandle.CatalogVersion;

import javax.annotation.concurrent.Immutable;

Expand All @@ -29,18 +30,21 @@
public final class Output
{
private final String catalogName;
private final CatalogVersion catalogVersion;
private final String schema;
private final String table;
private final Optional<List<OutputColumn>> columns;

@JsonCreator
public Output(
@JsonProperty("catalogName") String catalogName,
@JsonProperty("catalogVersion") CatalogVersion catalogVersion,
@JsonProperty("schema") String schema,
@JsonProperty("table") String table,
@JsonProperty("columns") Optional<List<OutputColumn>> columns)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.catalogVersion = requireNonNull(catalogVersion, "catalogVersion is null");
this.schema = requireNonNull(schema, "schema is null");
this.table = requireNonNull(table, "table is null");
this.columns = columns.map(ImmutableList::copyOf);
Expand All @@ -52,6 +56,12 @@ public String getCatalogName()
return catalogName;
}

@JsonProperty
public CatalogVersion getCatalogVersion()
{
return catalogVersion;
}

@JsonProperty
public String getSchema()
{
Expand Down Expand Up @@ -81,6 +91,7 @@ public boolean equals(Object o)
}
Output output = (Output) o;
return Objects.equals(catalogName, output.catalogName) &&
Objects.equals(catalogVersion, output.catalogVersion) &&
Objects.equals(schema, output.schema) &&
Objects.equals(table, output.table) &&
Objects.equals(columns, output.columns);
Expand All @@ -89,6 +100,6 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(catalogName, schema, table, columns);
return Objects.hash(catalogName, catalogVersion, schema, table, columns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ protected Scope visitInsert(Insert insert, Optional<Scope> scope)

analysis.setUpdateType("INSERT");
analysis.setUpdateTarget(
targetTableHandle.get().getCatalogHandle().getVersion(),
targetTable,
Optional.empty(),
Optional.of(Streams.zip(
Expand All @@ -654,8 +655,10 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
analysis.setUpdateType("REFRESH MATERIALIZED VIEW");

if (metadata.delegateMaterializedViewRefreshToConnector(session, name)) {
CatalogHandle catalogHandle = getRequiredCatalogHandle(metadata, session, refreshMaterializedView, name.getCatalogName());
analysis.setDelegatedRefreshMaterializedView(name);
analysis.setUpdateTarget(
catalogHandle.getVersion(),
name,
Optional.empty(),
Optional.empty());
Expand Down Expand Up @@ -702,6 +705,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
Column::new);

analysis.setUpdateTarget(
targetTableHandle.getCatalogHandle().getVersion(),
targetTable,
Optional.empty(),
Optional.of(Streams.zip(
Expand Down Expand Up @@ -803,7 +807,7 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
node.getWhere().ifPresent(where -> analyzeWhere(node, tableScope, where));

analysis.setUpdateType("DELETE");
analysis.setUpdateTarget(tableName, Optional.of(table), Optional.empty());
analysis.setUpdateTarget(handle.getCatalogHandle().getVersion(), tableName, Optional.of(table), Optional.empty());
Scope accessControlScope = Scope.builder()
.withRelationType(RelationId.anonymous(), analysis.getScope(table).getRelationType())
.build();
Expand All @@ -820,16 +824,16 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
protected Scope visitAnalyze(Analyze node, Optional<Scope> scope)
{
QualifiedObjectName tableName = createQualifiedObjectName(session, node, node.getTableName());
analysis.setUpdateType("ANALYZE");
analysis.setUpdateTarget(tableName, Optional.empty(), Optional.empty());

if (metadata.isView(session, tableName)) {
throw semanticException(NOT_SUPPORTED, node, "Analyzing views is not supported");
}

TableHandle tableHandle = metadata.getTableHandle(session, tableName)
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, node, "Table '%s' does not exist", tableName));

analysis.setUpdateType("ANALYZE");
analysis.setUpdateTarget(tableHandle.getCatalogHandle().getVersion(), tableName, Optional.empty(), Optional.empty());

validateProperties(node.getProperties(), scope);
String catalogName = tableName.getCatalogName();
CatalogHandle catalogHandle = getRequiredCatalogHandle(metadata, session, node, catalogName);
Expand Down Expand Up @@ -879,7 +883,7 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
node.isWithData(),
true));
analysis.setUpdateType("CREATE TABLE");
analysis.setUpdateTarget(targetTable, Optional.empty(), Optional.of(ImmutableList.of()));
analysis.setUpdateTarget(targetTableHandle.get().getCatalogHandle().getVersion(), targetTable, Optional.empty(), Optional.of(ImmutableList.of()));
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}
throw semanticException(TABLE_ALREADY_EXISTS, node, "Destination table '%s' already exists", targetTable);
Expand Down Expand Up @@ -969,6 +973,7 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop

analysis.setUpdateType("CREATE TABLE");
analysis.setUpdateTarget(
catalogHandle.getVersion(),
targetTable,
Optional.empty(),
Optional.of(outputColumns.build()));
Expand All @@ -990,8 +995,10 @@ protected Scope visitCreateView(CreateView node, Optional<Scope> scope)

validateColumns(node, queryScope.getRelationType());

CatalogHandle catalogHandle = getRequiredCatalogHandle(metadata, session, node, viewName.getCatalogName());
analysis.setUpdateType("CREATE VIEW");
analysis.setUpdateTarget(
catalogHandle.getVersion(),
viewName,
Optional.empty(),
Optional.of(queryScope.getRelationType().getVisibleFields().stream()
Expand Down Expand Up @@ -1217,7 +1224,7 @@ protected Scope visitTableExecute(TableExecute node, Optional<Scope> scope)
analysis.setTableExecuteHandle(executeHandle);

analysis.setUpdateType("ALTER TABLE EXECUTE");
analysis.setUpdateTarget(tableName, Optional.of(table), Optional.empty());
analysis.setUpdateTarget(executeHandle.getCatalogHandle().getVersion(), tableName, Optional.of(table), Optional.empty());

return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}
Expand Down Expand Up @@ -1368,8 +1375,10 @@ protected Scope visitCreateMaterializedView(CreateMaterializedView node, Optiona

validateColumns(node, queryScope.getRelationType());

CatalogHandle catalogHandle = getRequiredCatalogHandle(metadata, session, node, viewName.getCatalogName());
analysis.setUpdateType("CREATE MATERIALIZED VIEW");
analysis.setUpdateTarget(
catalogHandle.getVersion(),
viewName,
Optional.empty(),
Optional.of(
Expand Down Expand Up @@ -3309,6 +3318,7 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)

analysis.setUpdateType("UPDATE");
analysis.setUpdateTarget(
handle.getCatalogHandle().getVersion(),
tableName,
Optional.of(table),
Optional.of(updatedColumnSchemas.stream()
Expand Down Expand Up @@ -3478,7 +3488,7 @@ else if (operation instanceof MergeInsert && caseColumnNames.isEmpty()) {
.collect(toImmutableList());

analysis.setUpdateType("MERGE");
analysis.setUpdateTarget(tableName, Optional.of(table), Optional.of(updatedColumns));
analysis.setUpdateTarget(targetTableHandle.getCatalogHandle().getVersion(), tableName, Optional.of(table), Optional.of(updatedColumns));
List<List<ColumnHandle>> mergeCaseColumnHandles = buildCaseColumnLists(merge, dataColumnSchemas, allColumnHandles);

createMergeAnalysis(table, targetTableHandle, tableSchema, targetTableScope, joinScope, mergeCaseColumnHandles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,15 @@ private Input createInput(Session session, TableHandle table, Set<Column> column
CatalogSchemaTableName tableName = metadata.getTableName(session, table);
SchemaTableName schemaTable = tableName.getSchemaTableName();
Optional<Object> inputMetadata = metadata.getInfo(session, table);
return new Input(tableName.getCatalogName(), schemaTable.getSchemaName(), schemaTable.getTableName(), inputMetadata, ImmutableList.copyOf(columns), fragmentId, planNodeId);
return new Input(
tableName.getCatalogName(),
table.getCatalogHandle().getVersion(),
schemaTable.getSchemaName(),
schemaTable.getTableName(),
inputMetadata,
ImmutableList.copyOf(columns),
fragmentId,
planNodeId);
}

private class Visitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.trino.spi.connector.CatalogHandle.CatalogVersion;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import org.testng.annotations.Test;
Expand All @@ -32,6 +33,7 @@ public void testRoundTrip()
{
Input expected = new Input(
"connectorId",
new CatalogVersion("default"),
"schema",
"table",
Optional.empty(),
Expand Down
Loading