Skip to content

Commit

Permalink
Implement Delta Lake views support
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet authored and ebyhr committed Dec 8, 2022
1 parent 0570fde commit 6283fef
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 9 deletions.
4 changes: 4 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,12 @@
<exclude>**/TestDeltaLakeAdlsConnectorSmokeTest.java</exclude>
<exclude>**/TestDeltaLakeGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeCleanUpGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeSharedGlueMetastoreViews.java</exclude>
<exclude>**/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java</exclude>
<exclude>**/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeRenameToWithGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeRegisterTableProcedureWithGlue.java</exclude>
<exclude>**/TestDeltaLakeViewsGlueMetastore.java</exclude>
<exclude>**/TestDeltaLakeGcsConnectorSmokeTest.java</exclude>
</excludes>
</configuration>
Expand Down Expand Up @@ -474,10 +476,12 @@
<include>**/TestDeltaLakeAdlsConnectorSmokeTest.java</include>
<include>**/TestDeltaLakeGlueMetastore.java</include>
<include>**/TestDeltaLakeCleanUpGlueMetastore.java</include>
<include>**/TestDeltaLakeSharedGlueMetastoreViews.java</include>
<include>**/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java</include>
<include>**/TestDeltaLakeTableWithCustomLocationUsingGlueMetastore.java</include>
<include>**/TestDeltaLakeRenameToWithGlueMetastore.java</include>
<include>**/TestDeltaLakeRegisterTableProcedureWithGlue.java</include>
<include>**/TestDeltaLakeViewsGlueMetastore.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.SchemaAlreadyExistsException;
import io.trino.plugin.hive.TableAlreadyExistsException;
import io.trino.plugin.hive.TrinoViewHiveMetastore;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HivePrincipal;
Expand Down Expand Up @@ -82,6 +83,7 @@
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
Expand Down Expand Up @@ -286,6 +288,7 @@ public class DeltaLakeMetadata
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final AccessControlMetadata accessControlMetadata;
private final TrinoViewHiveMetastore trinoViewHiveMetastore;
private final CheckpointWriterManager checkpointWriterManager;
private final long defaultCheckpointInterval;
private final int domainCompactionThreshold;
Expand All @@ -309,6 +312,7 @@ public DeltaLakeMetadata(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
AccessControlMetadata accessControlMetadata,
TrinoViewHiveMetastore trinoViewHiveMetastore,
int domainCompactionThreshold,
boolean unsafeWritesEnabled,
JsonCodec<DataFileInfo> dataFileInfoCodec,
Expand All @@ -329,6 +333,7 @@ public DeltaLakeMetadata(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.accessControlMetadata = requireNonNull(accessControlMetadata, "accessControlMetadata is null");
this.trinoViewHiveMetastore = requireNonNull(trinoViewHiveMetastore, "trinoViewHiveMetastore is null");
this.domainCompactionThreshold = domainCompactionThreshold;
this.unsafeWritesEnabled = unsafeWritesEnabled;
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
Expand Down Expand Up @@ -2077,6 +2082,36 @@ public Map<String, Object> getSchemaProperties(ConnectorSession session, Catalog
return db.map(DeltaLakeSchemaProperties::fromDatabase).orElseThrow(() -> new SchemaNotFoundException(schema));
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
{
trinoViewHiveMetastore.createView(session, viewName, definition, replace);
}

@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
trinoViewHiveMetastore.dropView(viewName);
}

@Override
public List<SchemaTableName> listViews(ConnectorSession session, Optional<String> schemaName)
{
return trinoViewHiveMetastore.listViews(schemaName);
}

@Override
public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession session, Optional<String> schemaName)
{
return trinoViewHiveMetastore.getViews(schemaName);
}

@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
return trinoViewHiveMetastore.getView(viewName);
}

@Override
public void createRole(ConnectorSession session, String role, Optional<TrinoPrincipal> grantor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.TrinoViewHiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.plugin.hive.security.AccessControlMetadata;
import io.trino.spi.NodeManager;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
Expand Down Expand Up @@ -58,6 +61,7 @@ public class DeltaLakeMetadataFactory
private final boolean useUniqueTableLocation;

private final boolean allowManagedTableRename;
private final String trinoVersion;

@Inject
public DeltaLakeMetadataFactory(
Expand All @@ -76,7 +80,8 @@ public DeltaLakeMetadataFactory(
CheckpointWriterManager checkpointWriterManager,
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
CachingExtendedStatisticsAccess statisticsAccess,
@AllowDeltaLakeManagedTableRename boolean allowManagedTableRename)
@AllowDeltaLakeManagedTableRename boolean allowManagedTableRename,
NodeVersion nodeVersion)
{
this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -99,6 +104,7 @@ public DeltaLakeMetadataFactory(
this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback();
this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation();
this.allowManagedTableRename = allowManagedTableRename;
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}

public DeltaLakeMetadata create(ConnectorIdentity identity)
Expand All @@ -107,18 +113,25 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
CachingHiveMetastore cachingHiveMetastore = memoizeMetastore(
hiveMetastoreFactory.createMetastore(Optional.of(identity)),
perTransactionMetastoreCacheMaximumSize);
AccessControlMetadata accessControlMetadata = accessControlMetadataFactory.create(cachingHiveMetastore);
HiveMetastoreBackedDeltaLakeMetastore deltaLakeMetastore = new HiveMetastoreBackedDeltaLakeMetastore(
cachingHiveMetastore,
transactionLogAccess,
typeManager,
statisticsAccess,
fileSystemFactory);
TrinoViewHiveMetastore trinoViewHiveMetastore = new TrinoViewHiveMetastore(
cachingHiveMetastore,
accessControlMetadata.isUsingSystemSecurity(),
trinoVersion,
"Trino Delta Lake connector");
return new DeltaLakeMetadata(
deltaLakeMetastore,
fileSystemFactory,
hdfsEnvironment,
typeManager,
accessControlMetadataFactory.create(cachingHiveMetastore),
accessControlMetadata,
trinoViewHiveMetastore,
domainCompactionThreshold,
unsafeWritesEnabled,
dataFileInfoCodec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ protected QueryRunner createQueryRunner()
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
switch (connectorBehavior) {
case SUPPORTS_CREATE_VIEW:
return true;

case SUPPORTS_RENAME_SCHEMA:
return false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_DELETE:
case SUPPORTS_UPDATE:
case SUPPORTS_MERGE:
case SUPPORTS_CREATE_VIEW:
return true;

default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;

/**
* Tests querying views on a schema which has a mix of Hive and Delta Lake tables.
*/
public abstract class BaseDeltaLakeSharedMetastoreViewsTest
extends AbstractTestQueryFramework
{
protected static final String DELTA_CATALOG_NAME = "delta_lake";
protected static final String HIVE_CATALOG_NAME = "hive";
protected static final String SCHEMA = "test_shared_schema_views_" + randomNameSuffix();

private String dataDirectory;
private HiveMetastore metastore;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(DELTA_CATALOG_NAME)
.setSchema(SCHEMA)
.build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build();

this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString();
this.metastore = createTestMetastore(dataDirectory);

queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE));
queryRunner.createCatalog(DELTA_CATALOG_NAME, "delta-lake");

queryRunner.installPlugin(new TestingHivePlugin(metastore));

ImmutableMap<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("hive.allow-drop-table", "true")
.buildOrThrow();

queryRunner.createCatalog(HIVE_CATALOG_NAME, "hive", hiveProperties);
queryRunner.execute("CREATE SCHEMA " + SCHEMA);

return queryRunner;
}

protected abstract HiveMetastore createTestMetastore(String dataDirectory);

@Test
public void testViewWithLiteralColumnCreatedInDeltaLakeIsReadableInHive()
{
String deltaViewName = "delta_view_" + randomNameSuffix();
String deltaView = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, deltaViewName);
String deltaViewOnHiveCatalog = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, deltaViewName);
try {
assertUpdate(format("CREATE VIEW %s AS SELECT 1 bee", deltaView));
assertQuery(format("SELECT * FROM %s", deltaView), "VALUES 1");
assertQuery(format("SELECT * FROM %s", deltaViewOnHiveCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", HIVE_CATALOG_NAME, deltaViewName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP VIEW IF EXISTS %s", deltaView));
}
}

@Test
public void testViewOnDeltaLakeTableCreatedInDeltaLakeIsReadableInHive()
{
String deltaTableName = "delta_table_" + randomNameSuffix();
String deltaTable = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, deltaTableName);
String deltaViewName = "delta_view_" + randomNameSuffix();
String deltaView = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, deltaViewName);
String deltaViewOnHiveCatalog = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, deltaViewName);
try {
assertUpdate(format("CREATE TABLE %s AS SELECT 1 bee", deltaTable), 1);
assertUpdate(format("CREATE VIEW %s AS SELECT * from %s", deltaView, deltaTable));
assertQuery(format("SELECT * FROM %s", deltaView), "VALUES 1");
assertQuery(format("SELECT * FROM %s", deltaViewOnHiveCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", HIVE_CATALOG_NAME, deltaViewName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP TABLE IF EXISTS %s", deltaTable));
assertUpdate(format("DROP VIEW IF EXISTS %s", deltaView));
}
}

@Test
public void testViewWithLiteralColumnCreatedInHiveIsReadableInDeltaLake()
{
String trinoViewOnHiveName = "trino_view_on_hive_" + randomNameSuffix();
String trinoViewOnHive = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
String trinoViewOnHiveOnDeltaCatalog = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
try {
assertUpdate(format("CREATE VIEW %s AS SELECT 1 bee", trinoViewOnHive));
assertQuery(format("SELECT * FROM %s", trinoViewOnHive), "VALUES 1");
assertQuery(format("SELECT * FROM %s", trinoViewOnHiveOnDeltaCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", HIVE_CATALOG_NAME, trinoViewOnHiveName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP VIEW IF EXISTS %s", trinoViewOnHive));
}
}

@Test
public void testViewOnHiveTableCreatedInHiveIsReadableInDeltaLake()
{
String hiveTableName = "hive_table_" + randomNameSuffix();
String hiveTable = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, hiveTableName);
String trinoViewOnHiveName = "trino_view_on_hive_" + randomNameSuffix();
String trinoViewOnHive = format("%s.%s.%s", HIVE_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
String trinoViewOnHiveOnDeltaCatalog = format("%s.%s.%s", DELTA_CATALOG_NAME, SCHEMA, trinoViewOnHiveName);
try {
assertUpdate(format("CREATE TABLE %s AS SELECT 1 bee", hiveTable), 1);
assertUpdate(format("CREATE VIEW %s AS SELECT 1 bee", trinoViewOnHive));
assertQuery(format("SELECT * FROM %s", trinoViewOnHive), "VALUES 1");
assertQuery(format("SELECT * FROM %s", trinoViewOnHiveOnDeltaCatalog), "VALUES 1");
assertQuery(format("SELECT table_type FROM %s.information_schema.tables WHERE table_name = '%s' AND table_schema='%s'", DELTA_CATALOG_NAME, trinoViewOnHiveName, SCHEMA), "VALUES 'VIEW'");
}
finally {
assertUpdate(format("DROP TABLE IF EXISTS %s", hiveTable));
assertUpdate(format("DROP VIEW IF EXISTS %s", trinoViewOnHive));
}
}

@AfterClass(alwaysRun = true)
public void cleanup()
throws IOException
{
if (metastore != null) {
metastore.dropDatabase(SCHEMA, false);
deleteRecursively(Path.of(dataDirectory), ALLOW_INSECURE);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.hive.metastore.HiveMetastore;

import java.io.File;

import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;

public class TestDeltaLakeSharedFileMetastoreViews
extends BaseDeltaLakeSharedMetastoreViewsTest
{
@Override
protected HiveMetastore createTestMetastore(String dataDirectory)
{
return createTestingFileHiveMetastore(new File(dataDirectory));
}
}
Loading

0 comments on commit 6283fef

Please sign in to comment.