Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Remove usages of Hadoop Path for Hive LocationService" #17947

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.hive;

import com.google.inject.Inject;
import io.trino.filesystem.Location;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.LocationHandle.WriteMode;
Expand Down Expand Up @@ -57,32 +56,32 @@ public HiveLocationService(HdfsEnvironment hdfsEnvironment, HiveConfig hiveConfi
}

@Override
public Location forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName)
public Path forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName)
{
HdfsContext context = new HdfsContext(session);
Location targetPath = getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName);
Path targetPath = getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName);

// verify the target directory for table
if (pathExists(context, hdfsEnvironment, new Path(targetPath.toString()))) {
if (pathExists(context, hdfsEnvironment, targetPath)) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath));
}
return targetPath;
}

@Override
public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Location> externalLocation)
public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Path> externalLocation)
{
HdfsContext context = new HdfsContext(session);
Location targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName));
Path targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName));

// verify the target directory for the table
if (pathExists(context, hdfsEnvironment, new Path(targetPath.toString()))) {
if (pathExists(context, hdfsEnvironment, targetPath)) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath));
}

// TODO detect when existing table's location is a on a different file system than the temporary directory
if (shouldUseTemporaryDirectory(context, new Path(targetPath.toString()), externalLocation.isPresent())) {
Location writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath);
Path writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath);
return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_NEW_DIRECTORY);
Expand All @@ -92,10 +91,10 @@ public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metasto
public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table)
{
HdfsContext context = new HdfsContext(session);
Location targetPath = Location.of(table.getStorage().getLocation());
Path targetPath = new Path(table.getStorage().getLocation());

if (shouldUseTemporaryDirectory(context, new Path(targetPath.toString()), false) && !isTransactionalTable(table.getParameters())) {
Location writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath);
Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath, temporaryStagingDirectoryPath);
return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
Expand All @@ -105,7 +104,7 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table)
{
// For OPTIMIZE write result files directly to table directory; that is needed by the commit logic in HiveMetadata#finishTableExecute
Location targetPath = Location.of(table.getStorage().getLocation());
Path targetPath = new Path(table.getStorage().getLocation());
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
}

Expand Down Expand Up @@ -141,23 +140,27 @@ public WriteInfo getPartitionWriteInfo(LocationHandle locationHandle, Optional<P
if (partition.isPresent()) {
// existing partition
WriteMode writeMode = locationHandle.getWriteMode();
Location targetPath = Location.of(partition.get().getStorage().getLocation());
Location writePath = getPartitionWritePath(locationHandle, partitionName, writeMode, targetPath);
Path targetPath = new Path(partition.get().getStorage().getLocation());
Path writePath = getPartitionWritePath(locationHandle, partitionName, writeMode, targetPath);
return new WriteInfo(targetPath, writePath, writeMode);
}
// new partition
return new WriteInfo(
locationHandle.getTargetPath().appendPath(partitionName),
locationHandle.getWritePath().appendPath(partitionName),
new Path(locationHandle.getTargetPath(), partitionName),
new Path(locationHandle.getWritePath(), partitionName),
locationHandle.getWriteMode());
}

private static Location getPartitionWritePath(LocationHandle locationHandle, String partitionName, WriteMode writeMode, Location targetPath)
private Path getPartitionWritePath(LocationHandle locationHandle, String partitionName, WriteMode writeMode, Path targetPath)
{
return switch (writeMode) {
case STAGE_AND_MOVE_TO_TARGET_DIRECTORY -> locationHandle.getWritePath().appendPath(partitionName);
case DIRECT_TO_TARGET_EXISTING_DIRECTORY -> targetPath;
case DIRECT_TO_TARGET_NEW_DIRECTORY -> throw new UnsupportedOperationException(format("inserting into existing partition is not supported for %s", writeMode));
};
switch (writeMode) {
case STAGE_AND_MOVE_TO_TARGET_DIRECTORY:
return new Path(locationHandle.getWritePath(), partitionName);
case DIRECT_TO_TARGET_EXISTING_DIRECTORY:
return targetPath;
case DIRECT_TO_TARGET_NEW_DIRECTORY:
throw new UnsupportedOperationException(format("inserting into existing partition is not supported for %s", writeMode));
}
throw new UnsupportedOperationException("Unexpected write mode: " + writeMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
.collect(toImmutableList());
checkPartitionTypesSupported(partitionColumns);

Optional<Location> targetPath;
Optional<Path> targetPath;
boolean external;
String externalLocation = getExternalLocation(tableMetadata.getProperties());
if (externalLocation != null) {
Expand All @@ -1005,8 +1005,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

external = true;
targetPath = Optional.of(getValidatedExternalLocation(externalLocation));
checkExternalPath(new HdfsContext(session), new Path(targetPath.get().toString()));
targetPath = Optional.of(getExternalLocationAsPath(externalLocation));
checkExternalPath(new HdfsContext(session), targetPath.get());
}
else {
external = false;
Expand Down Expand Up @@ -1273,10 +1273,11 @@ private static String validateAvroSchemaLiteral(String avroSchemaLiteral)
}
}

private static Location getValidatedExternalLocation(String location)
private static Path getExternalLocationAsPath(String location)
{
try {
return Location.of(location);
Location.of(location); // Calling just for validation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testTableLocationTopOfTheBucket doesn't pass without this line.

return new Path(location);
}
catch (IllegalArgumentException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + location, e);
Expand Down Expand Up @@ -1315,7 +1316,7 @@ private static Table buildTableObject(
List<String> partitionedBy,
Optional<HiveBucketProperty> bucketProperty,
Map<String, String> additionalTableParameters,
Optional<Location> targetPath,
Optional<Path> targetPath,
boolean external,
String prestoVersion,
boolean usingSystemSecurity)
Expand Down Expand Up @@ -1606,8 +1607,8 @@ private static List<String> canonicalizePartitionValues(String partitionName, Li
@Override
public HiveOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
Optional<Location> externalLocation = Optional.ofNullable(getExternalLocation(tableMetadata.getProperties()))
.map(HiveMetadata::getValidatedExternalLocation);
Optional<Path> externalLocation = Optional.ofNullable(getExternalLocation(tableMetadata.getProperties()))
.map(HiveMetadata::getExternalLocationAsPath);
if (!createsOfNonManagedTablesEnabled && externalLocation.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Creating non-managed Hive tables is disabled");
}
Expand Down Expand Up @@ -1694,7 +1695,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
retryMode != NO_RETRIES);

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), schemaTableName);
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), schemaTableName);

return result;
}
Expand All @@ -1720,7 +1721,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
handle.getPartitionedBy(),
handle.getBucketProperty(),
handle.getAdditionalTableParameters(),
Optional.of(writeInfo.targetPath()),
Optional.of(writeInfo.getTargetPath()),
handle.isExternal(),
prestoVersion,
accessControlMetadata.isUsingSystemSecurity());
Expand Down Expand Up @@ -1765,7 +1766,6 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of());
}

Optional<Path> writePath = Optional.of(new Path(writeInfo.writePath().toString()));
if (handle.getPartitionedBy().isEmpty()) {
List<String> fileNames;
if (partitionUpdates.isEmpty()) {
Expand All @@ -1775,10 +1775,10 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
else {
fileNames = getOnlyElement(partitionUpdates).getFileNames();
}
metastore.createTable(session, table, principalPrivileges, writePath, Optional.of(fileNames), false, tableStatistics, handle.isRetriesEnabled());
metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), Optional.of(fileNames), false, tableStatistics, handle.isRetriesEnabled());
}
else {
metastore.createTable(session, table, principalPrivileges, writePath, Optional.empty(), false, tableStatistics, false);
metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), Optional.empty(), false, tableStatistics, false);
}

if (!handle.getPartitionedBy().isEmpty()) {
Expand Down Expand Up @@ -2001,7 +2001,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg

LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table);
WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
metastore.finishMerge(session, table.getDatabaseName(), table.getTableName(), writeInfo.writePath(), partitionMergeResults, partitions);
metastore.finishMerge(session, table.getDatabaseName(), table.getTableName(), writeInfo.getWritePath(), partitionMergeResults, partitions);
}

@Override
Expand Down Expand Up @@ -2072,7 +2072,7 @@ else if (isTransactional) {

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
if (getInsertExistingPartitionsBehavior(session) == InsertExistingPartitionsBehavior.OVERWRITE
&& writeInfo.writeMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
&& writeInfo.getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
if (isTransactional) {
throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in transactional tables doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
}
Expand All @@ -2082,7 +2082,7 @@ else if (isTransactional) {
throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in non auto commit context doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
}
}
metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), tableName);
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), tableName);
return result;
}

Expand Down Expand Up @@ -2453,7 +2453,7 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
HiveTableHandle hiveSourceTableHandle = (HiveTableHandle) sourceTableHandle;

WriteInfo writeInfo = locationService.getQueryWriteInfo(hiveExecuteHandle.getLocationHandle());
String writeDeclarationId = metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), hiveExecuteHandle.getSchemaTableName());
String writeDeclarationId = metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), hiveExecuteHandle.getSchemaTableName());

return new BeginTableExecuteResult<>(
hiveExecuteHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,17 @@ public HiveWriterFactory(
this.dataColumns = dataColumns.build();
this.isCreateTransactionalTable = isCreateTable && transaction.isTransactional();

Location writePath;
Path writePath;
if (isCreateTable) {
this.table = null;
WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
checkArgument(writeInfo.writeMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY, "CREATE TABLE write mode cannot be DIRECT_TO_TARGET_EXISTING_DIRECTORY");
writePath = writeInfo.writePath();
checkArgument(writeInfo.getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY, "CREATE TABLE write mode cannot be DIRECT_TO_TARGET_EXISTING_DIRECTORY");
writePath = writeInfo.getWritePath();
}
else {
this.table = pageSinkMetadataProvider.getTable()
.orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, format("Table '%s.%s' was dropped during insert", schemaName, tableName)));
writePath = locationService.getQueryWriteInfo(locationHandle).writePath();
writePath = locationService.getQueryWriteInfo(locationHandle).getWritePath();
}

this.bucketCount = requireNonNull(bucketCount, "bucketCount is null");
Expand All @@ -289,12 +289,12 @@ public HiveWriterFactory(
.filter(entry -> entry.getValue() != null)
.collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().toString()));

Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(writePath.toString()));
Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), writePath);
this.conf = toJobConf(conf);

// make sure the FileSystem is created with the correct Configuration object
try {
hdfsEnvironment.getFileSystem(session.getIdentity(), new Path(writePath.toString()), conf);
hdfsEnvironment.getFileSystem(session.getIdentity(), writePath, conf);
}
catch (IOException e) {
throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + writePath, e);
Expand Down Expand Up @@ -357,18 +357,18 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
// a new partition in a new partitioned table
writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get());

if (!writeInfo.writeMode().isWritePathSameAsTargetPath()) {
if (!writeInfo.getWriteMode().isWritePathSameAsTargetPath()) {
// When target path is different from write path,
// verify that the target directory for the partition does not already exist
Location writeInfoTargetPath = writeInfo.targetPath();
Location writeInfoTargetPath = Location.of(writeInfo.getTargetPath().toString());
try {
if (fileSystem.directoryExists(writeInfoTargetPath).orElse(false)) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format(
"Target directory for new partition '%s' of table '%s.%s' already exists: %s",
partitionName,
schemaName,
tableName,
writeInfo.targetPath()));
writeInfo.getTargetPath()));
}
}
catch (IOException e) {
Expand Down Expand Up @@ -482,7 +482,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt

int bucketToUse = bucketNumber.isEmpty() ? 0 : bucketNumber.getAsInt();

Location path = writeInfo.writePath();
Location path = Location.of(writeInfo.getWritePath().toString());
if (transaction.isAcidTransactionRunning() && transaction.getOperation() != CREATE_TABLE) {
String subdir = computeAcidSubdir(transaction);
String nameFormat = table != null && isInsertOnlyTable(table.getParameters()) ? "%05d_0" : "bucket_%05d";
Expand Down Expand Up @@ -632,8 +632,8 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
partitionName,
updateMode,
path.fileName(),
writeInfo.writePath().toString(),
writeInfo.targetPath().toString(),
writeInfo.getWritePath().toString(),
writeInfo.getTargetPath().toString(),
onCommit,
hiveWriterStats);
}
Expand Down
Loading