Skip to content

Commit

Permalink
Add SPI, syntax and execution for table procedures
Browse files Browse the repository at this point in the history
Commit adds SPI and execution support for new table procedures.

New syntax extends ALTER TABLE family allowing for proper semantic
analysis of target table (unlike what we have with CALL when table
schema and table name are passed as string literals via table
arguments).

New syntax example:
 * ALTER TABLE <table> EXECUTE
 * ALTER TABLE <table> EXECUTE(value1, value2, ...)
 * ALTER TABLE <table> EXECUTE(param1 => value1, param2 => value2, ...) WHERE ...

New table procedures allow for rewriting table data which makes them
feasible for implementing data cleansing routines like:
 * compacting small files into larger ones for HIVE table
 * changing files sorting or bucketing scheme for a table
 * Iceberg OPTIMIZE

Currently exectuion flow which _does not_ rewrite table data is not
implemented. It will be implemented as a followup. Then current
procedures available via `CALL` will be migrated to new mechanism.

Procedures are exposed via connectors to engine via a set of new SPI
methods and classes:
 * io.trino.spi.connector.TableProcedureMetadata
 * io.trino.spi.connector.TableProcedureExecutionMode
 * io.trino.spi.connector.ConnectorTableExecuteHandle
 * io.trino.spi.connector.ConnectorMetadata#getTableHandleForExecute
 * io.trino.spi.connector.ConnectorMetadata#getLayoutForTableExecute
 * io.trino.spi.connector.ConnectorMetadata#beginTableExecute
 * io.trino.spi.connector.ConnectorMetadata#finishTableExecute
  • Loading branch information
losipiuk committed Oct 25, 2021
1 parent c558ba6 commit 36ff8b5
Show file tree
Hide file tree
Showing 95 changed files with 2,328 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.session.PropertyMetadata;
Expand Down Expand Up @@ -300,6 +301,8 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
.ifPresent(partitioningProvider -> nodePartitioningManager.addPartitioningProvider(catalogName, partitioningProvider));

metadataManager.getProcedureRegistry().addProcedures(catalogName, connector.getProcedures());
Set<TableProcedureMetadata> tableProcedures = connector.getTableProcedures();
metadataManager.getTableProcedureRegistry().addTableProcedures(catalogName, tableProcedures);

connector.getAccessControl()
.ifPresent(accessControl -> accessControlManager.addCatalogAccessControl(catalogName, accessControl));
Expand All @@ -309,6 +312,9 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
metadataManager.getColumnPropertyManager().addProperties(catalogName, connector.getColumnProperties());
metadataManager.getSchemaPropertyManager().addProperties(catalogName, connector.getSchemaProperties());
metadataManager.getAnalyzePropertyManager().addProperties(catalogName, connector.getAnalyzeProperties());
for (TableProcedureMetadata tableProcedure : tableProcedures) {
metadataManager.getTableProceduresPropertyManager().addProperties(catalogName, tableProcedure.getName(), tableProcedure.getProperties());
}
metadataManager.getSessionPropertyManager().addConnectorSessionProperties(catalogName, connector.getSessionProperties());
}

Expand All @@ -333,12 +339,14 @@ private synchronized void removeConnectorInternal(CatalogName catalogName)
indexManager.removeIndexProvider(catalogName);
nodePartitioningManager.removePartitioningProvider(catalogName);
metadataManager.getProcedureRegistry().removeProcedures(catalogName);
metadataManager.getTableProcedureRegistry().removeProcedures(catalogName);
accessControlManager.removeCatalogAccessControl(catalogName);
metadataManager.getTablePropertyManager().removeProperties(catalogName);
metadataManager.getMaterializedViewPropertyManager().removeProperties(catalogName);
metadataManager.getColumnPropertyManager().removeProperties(catalogName);
metadataManager.getSchemaPropertyManager().removeProperties(catalogName);
metadataManager.getAnalyzePropertyManager().removeProperties(catalogName);
metadataManager.getTableProceduresPropertyManager().removeProperties(catalogName);
metadataManager.getSessionPropertyManager().removeConnectorSessionProperties(catalogName);

MaterializedConnector materializedConnector = connectors.remove(catalogName);
Expand Down Expand Up @@ -402,6 +410,7 @@ private static class MaterializedConnector
private final Connector connector;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;
private final Optional<ConnectorSplitManager> splitManager;
private final Optional<ConnectorPageSourceProvider> pageSourceProvider;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
Expand Down Expand Up @@ -429,6 +438,10 @@ public MaterializedConnector(CatalogName catalogName, Connector connector)
requireNonNull(procedures, format("Connector '%s' returned a null procedures set", catalogName));
this.procedures = ImmutableSet.copyOf(procedures);

Set<TableProcedureMetadata> tableProcedures = connector.getTableProcedures();
requireNonNull(procedures, format("Connector '%s' returned a null table procedures set", catalogName));
this.tableProcedures = ImmutableSet.copyOf(tableProcedures);

ConnectorSplitManager splitManager = null;
try {
splitManager = connector.getSplitManager();
Expand Down Expand Up @@ -539,6 +552,11 @@ public Set<Procedure> getProcedures()
return procedures;
}

public Set<TableProcedureMetadata> getTableProcedures()
{
return tableProcedures;
}

public Optional<ConnectorSplitManager> getSplitManager()
{
return splitManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class SqlQueryExecution
private final StatsCalculator statsCalculator;
private final CostCalculator costCalculator;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;

private SqlQueryExecution(
PreparedQuery preparedQuery,
Expand Down Expand Up @@ -159,7 +160,8 @@ private SqlQueryExecution(
StatsCalculator statsCalculator,
CostCalculator costCalculator,
DynamicFilterService dynamicFilterService,
WarningCollector warningCollector)
WarningCollector warningCollector,
TableExecuteContextManager tableExecuteContextManager)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
this.slug = requireNonNull(slug, "slug is null");
Expand All @@ -180,6 +182,7 @@ private SqlQueryExecution(
this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null");
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");

checkArgument(scheduleSplitBatchSize > 0, "scheduleSplitBatchSize must be greater than 0");
this.scheduleSplitBatchSize = scheduleSplitBatchSize;
Expand All @@ -195,6 +198,8 @@ private SqlQueryExecution(
}
unregisterDynamicFilteringQuery(
dynamicFilterService.getDynamicFilteringStats(stateMachine.getQueryId(), stateMachine.getSession()));

tableExecuteContextManager.unregisterTableExecuteContextForQuery(stateMachine.getQueryId());
});

// when the query finishes cache the final query info, and clear the reference to the output stage
Expand Down Expand Up @@ -423,6 +428,8 @@ public void start()
}
}

tableExecuteContextManager.registerTableExecuteContextForQuery(getQueryId());

if (!stateMachine.transitionToStarting()) {
// query already started or finished
return;
Expand Down Expand Up @@ -544,7 +551,8 @@ private void planDistribution(PlanRoot plan)
nodeTaskMap,
executionPolicy,
schedulerStats,
dynamicFilterService);
dynamicFilterService,
tableExecuteContextManager);

queryScheduler.set(scheduler);

Expand Down Expand Up @@ -741,6 +749,7 @@ public static class SqlQueryExecutionFactory
private final StatsCalculator statsCalculator;
private final CostCalculator costCalculator;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;

@Inject
SqlQueryExecutionFactory(
Expand All @@ -765,7 +774,8 @@ public static class SqlQueryExecutionFactory
SplitSchedulerStats schedulerStats,
StatsCalculator statsCalculator,
CostCalculator costCalculator,
DynamicFilterService dynamicFilterService)
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager)
{
requireNonNull(config, "config is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
Expand All @@ -790,6 +800,7 @@ public static class SqlQueryExecutionFactory
this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null");
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
}

@Override
Expand Down Expand Up @@ -829,7 +840,8 @@ public QueryExecution createQueryExecution(
statsCalculator,
costCalculator,
dynamicFilterService,
warningCollector);
warningCollector,
tableExecuteContextManager);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import com.google.common.collect.ImmutableList;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;

import static java.util.Objects.requireNonNull;

public class TableExecuteContext
{
@GuardedBy("this")
private List<Object> splitsInfo;

public synchronized void setSplitsInfo(List<Object> splitsInfo)
{
requireNonNull(splitsInfo, "splitsInfo is null");
if (this.splitsInfo != null) {
throw new IllegalStateException("splitsInfo already set to " + this.splitsInfo);
}
this.splitsInfo = ImmutableList.copyOf(splitsInfo);
}

public synchronized List<Object> getSplitsInfo()
{
if (splitsInfo == null) {
throw new IllegalStateException("splitsInfo not set yet");
}
return splitsInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import io.trino.spi.QueryId;

import javax.annotation.concurrent.ThreadSafe;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@ThreadSafe
public class TableExecuteContextManager
{
private final ConcurrentMap<QueryId, TableExecuteContext> contexts = new ConcurrentHashMap<>();

public void registerTableExecuteContextForQuery(QueryId queryId)
{
TableExecuteContext newContext = new TableExecuteContext();
if (contexts.putIfAbsent(queryId, newContext) != null) {
throw new IllegalStateException("TableExecuteContext already registered for query " + queryId);
}
}

public void unregisterTableExecuteContextForQuery(QueryId queryId)
{
contexts.remove(queryId);
}

public TableExecuteContext getTableExecuteContextForQuery(QueryId queryId)
{
TableExecuteContext context = contexts.get(queryId);
if (context == null) {
throw new IllegalStateException("TableExecuteContext not registered for query " + queryId);
}
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.execution.Lifespan;
import io.trino.execution.RemoteTask;
import io.trino.execution.SqlStageExecution;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.ScheduleResult.BlockedReason;
import io.trino.execution.scheduler.group.DynamicLifespanScheduler;
import io.trino.execution.scheduler.group.FixedLifespanScheduler;
Expand Down Expand Up @@ -75,13 +76,15 @@ public FixedSourcePartitionedScheduler(
OptionalInt concurrentLifespansPerTask,
NodeSelector nodeSelector,
List<ConnectorPartitionHandle> partitionHandles,
DynamicFilterService dynamicFilterService)
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager)
{
requireNonNull(stage, "stage is null");
requireNonNull(splitSources, "splitSources is null");
requireNonNull(bucketNodeMap, "bucketNodeMap is null");
checkArgument(!requireNonNull(nodes, "nodes is null").isEmpty(), "nodes is empty");
requireNonNull(partitionHandles, "partitionHandles is null");
requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");

this.stage = stage;
this.nodes = ImmutableList.copyOf(nodes);
Expand Down Expand Up @@ -119,6 +122,7 @@ public FixedSourcePartitionedScheduler(
Math.max(splitBatchSize / concurrentLifespans, 1),
groupedExecutionForScanNode,
dynamicFilterService,
tableExecuteContextManager,
() -> true);

if (stageExecutionDescriptor.isStageGroupedExecution() && !groupedExecutionForScanNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.execution.Lifespan;
import io.trino.execution.RemoteTask;
import io.trino.execution.SqlStageExecution;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.FixedSourcePartitionedScheduler.BucketedSplitPlacementPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
Expand All @@ -40,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -96,6 +99,7 @@ private enum State
private final PlanNodeId partitionedNode;
private final boolean groupedExecution;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;
private final BooleanSupplier anySourceTaskBlocked;

private final Map<Lifespan, ScheduleGroup> scheduleGroups = new HashMap<>();
Expand All @@ -112,13 +116,15 @@ private SourcePartitionedScheduler(
int splitBatchSize,
boolean groupedExecution,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
BooleanSupplier anySourceTaskBlocked)
{
this.stage = requireNonNull(stage, "stage is null");
this.partitionedNode = requireNonNull(partitionedNode, "partitionedNode is null");
this.splitSource = requireNonNull(splitSource, "splitSource is null");
this.splitPlacementPolicy = requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
this.anySourceTaskBlocked = requireNonNull(anySourceTaskBlocked, "anySourceTaskBlocked is null");

checkArgument(splitBatchSize > 0, "splitBatchSize must be at least one");
Expand Down Expand Up @@ -146,6 +152,7 @@ public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
SplitPlacementPolicy splitPlacementPolicy,
int splitBatchSize,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
BooleanSupplier anySourceTaskBlocked)
{
SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler(
Expand All @@ -156,6 +163,7 @@ public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
splitBatchSize,
false,
dynamicFilterService,
tableExecuteContextManager,
anySourceTaskBlocked);
sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NOT_PARTITIONED);
sourcePartitionedScheduler.noMoreLifespans();
Expand Down Expand Up @@ -197,6 +205,7 @@ public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(
int splitBatchSize,
boolean groupedExecution,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
BooleanSupplier anySourceTaskBlocked)
{
return new SourcePartitionedScheduler(
Expand All @@ -207,6 +216,7 @@ public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(
splitBatchSize,
groupedExecution,
dynamicFilterService,
tableExecuteContextManager,
anySourceTaskBlocked);
}

Expand Down Expand Up @@ -357,6 +367,16 @@ else if (pendingSplits.isEmpty()) {
throw new IllegalStateException("At least 1 split should have been scheduled for this plan node");
case SPLITS_ADDED:
state = State.NO_MORE_SPLITS;

Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();

// Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split source.
// TODO support grouped execution
tableExecuteSplitsInfo.ifPresent(info -> {
TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stage.getStageId().getQueryId());
tableExecuteContext.setSplitsInfo(info);
});

splitSource.close();
// fall through
case NO_MORE_SPLITS:
Expand Down
Loading

0 comments on commit 36ff8b5

Please sign in to comment.