Skip to content

Commit

Permalink
[fix](catalog) refactor location path and support default fs (apache#…
Browse files Browse the repository at this point in the history
…39116)

`LocationPath` is used for normalizing the path uri of files of external
table.
But previouly, we use `LocationPath` every where and it is a very heavy
operation.

This PR refactor the logic as follow:
1. The `LocationPath` will once be used once when generating file split,
and it will be saved in FileSplit.
2. All following logic will reuse the `LocationPath` in file split.
  • Loading branch information
morningman authored Aug 21, 2024
1 parent 9ee7f2a commit 5b55f4b
Show file tree
Hide file tree
Showing 28 changed files with 444 additions and 564 deletions.
433 changes: 220 additions & 213 deletions fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.datasource.iceberg.source.IcebergSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand Down Expand Up @@ -268,7 +266,7 @@ public void createScanRangeLocations() throws UserException {
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
if (isCsvOrJson || isWal) {
params.setFileAttributes(getFileAttributes());
if (getLocationType() == TFileType.FILE_STREAM) {
if (isFileStreamType()) {
params.setFileType(TFileType.FILE_STREAM);
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
ExternalFileTableValuedFunction tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
Expand Down Expand Up @@ -309,19 +307,13 @@ public void createScanRangeLocations() throws UserException {
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) {
if (splitAssignment.getSampleSplit() == null && !isFileStreamType()) {
return;
}
selectedSplitNum = numApproximateSplits();

TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
TFileType locationType = fileSplit.getLocationType();
totalFileSize = fileSplit.getLength() * selectedSplitNum;
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
Expand Down Expand Up @@ -351,7 +343,7 @@ public void createScanRangeLocations() throws UserException {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
selectedSplitNum = inputSplits.size();
if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
if (inputSplits.isEmpty() && !isFileStreamType()) {
return;
}
Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits);
Expand Down Expand Up @@ -379,14 +371,6 @@ private TScanRangeLocations splitToScanRange(
Split split,
List<String> pathPartitionKeys) throws UserException {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType;
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}

TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
// Otherwise, use the values in file path.
Expand All @@ -396,41 +380,42 @@ private TScanRangeLocations splitToScanRange(
isACID = hiveSplit.isACID();
}
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
? BrokerUtil.parseColumnsFromPath(fileSplit.getPathString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();

TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
locationType);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
rangeDesc.setCompressType(fileCompressType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
if (fileSplit instanceof HiveSplit) {
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else if (fileSplit instanceof HiveSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
setLocationPropertiesIfNecessary(backend, fileSplit.getLocationType(), locationProperties);
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
curLocations.addToLocations(location);
Expand Down Expand Up @@ -493,8 +478,7 @@ private TScanRangeLocations newLocations() {
}

private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
List<String> columnsFromPathKeys, TFileType locationType)
throws UserException {
List<String> columnsFromPathKeys) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
Expand All @@ -504,10 +488,10 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> col
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);

rangeDesc.setFileType(locationType);
rangeDesc.setPath(fileSplit.getPath().toString());
if (locationType == TFileType.FILE_HDFS) {
URI fileUri = fileSplit.getPath().toUri();
rangeDesc.setFileType(fileSplit.getLocationType());
rangeDesc.setPath(fileSplit.getPath().toStorageLocation().toString());
if (fileSplit.getLocationType() == TFileType.FILE_HDFS) {
URI fileUri = fileSplit.getPath().getPath().toUri();
rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());
}
rangeDesc.setModificationTime(fileSplit.getModificationTime());
Expand Down Expand Up @@ -554,14 +538,16 @@ public int getNumInstances() {
return scanRangeLocations.size();
}

protected abstract TFileType getLocationType() throws UserException;

protected abstract TFileType getLocationType(String location) throws UserException;
// Return true if this is a TFileType.FILE_STREAM type.
// Currently, only TVFScanNode may be TFileType.FILE_STREAM type.
protected boolean isFileStreamType() throws UserException {
return false;
}

protected abstract TFileFormatType getFileFormatType() throws UserException;

protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
return Util.inferFileCompressTypeByPath(fileSplit.getPathString());
}

protected TFileAttributes getFileAttributes() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -46,7 +47,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -257,14 +257,14 @@ protected void setDefaultValueExprs(TableIf tbl,
}
}

protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
protected List<Split> splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
List<Split> result = Lists.newArrayList();
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.toString());
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get());
if (!splittable || compressType != TFileCompressType.PLAIN) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path {} is not splittable.", path);
Expand Down
25 changes: 15 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.doris.datasource;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileType;

import lombok.Data;
import org.apache.hadoop.fs.Path;

import java.util.List;

@Data
public class FileSplit implements Split {
public Path path;
public LocationPath path;
public long start;
// length of this split, in bytes
public long length;
Expand All @@ -43,27 +44,30 @@ public class FileSplit implements Split {
public List<String> partitionValues;

public List<String> alternativeHosts;
// the location type for BE, eg: HDFS, LOCAL, S3
protected TFileType locationType;

public FileSplit(Path path, long start, long length, long fileLength,
public FileSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues) {
this.path = path;
this.start = start;
this.length = length;
this.fileLength = fileLength;
this.modificationTime = modificationTime;
// BE requires modification time to be non-negative.
this.modificationTime = modificationTime < 0 ? 0 : modificationTime;
this.hosts = hosts == null ? new String[0] : hosts;
this.partitionValues = partitionValues;
}

public FileSplit(Path path, long start, long length, long fileLength,
String[] hosts, List<String> partitionValues) {
this(path, start, length, fileLength, 0, hosts, partitionValues);
this.locationType = path.isBindBroker() ? TFileType.FILE_BROKER : path.getTFileTypeForBE();
}

public String[] getHosts() {
return hosts;
}

public TFileType getLocationType() {
return locationType;
}

@Override
public Object getInfo() {
return null;
Expand All @@ -79,7 +83,8 @@ public static class FileSplitCreator implements SplitCreator {
public static final FileSplitCreator DEFAULT = new FileSplitCreator();

@Override
public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
public Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts,
List<String> partitionValues) {
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.doris.datasource;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;

import org.apache.hadoop.fs.Path;

import java.util.List;

public interface SplitCreator {
Split create(Path path, long start, long length, long fileLength,
Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,7 @@ private FileCacheValue getFileCache(String location, String inputFormat,
for (RemoteFile remoteFile : remoteFiles) {
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties());
Path convertedPath = locationPath.toStorageLocation();
if (!convertedPath.toString().equals(srcPath)) {
remoteFile.setPath(convertedPath);
}
result.addFile(remoteFile);
result.addFile(remoteFile, locationPath);
}
} else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
// User may manually remove partition under HDFS, in this case,
Expand Down Expand Up @@ -813,14 +809,17 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
if (status.ok()) {
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
Expand All @@ -837,8 +836,12 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
Expand Down Expand Up @@ -998,11 +1001,11 @@ public static class FileCacheValue {

private AcidInfo acidInfo;

public void addFile(RemoteFile file) {
public void addFile(RemoteFile file, LocationPath locationPath) {
if (isFileVisible(file.getPath())) {
HiveFileStatus status = new HiveFileStatus();
status.setBlockLocations(file.getBlockLocations());
status.setPath(file.getPath());
status.setPath(locationPath);
status.length = file.getSize();
status.blockSize = file.getBlockSize();
status.modificationTime = file.getModificationTime();
Expand All @@ -1014,7 +1017,6 @@ public int getValuesSize() {
return partitionValues == null ? 0 : partitionValues.size();
}


public AcidInfo getAcidInfo() {
return acidInfo;
}
Expand Down Expand Up @@ -1062,7 +1064,7 @@ private static boolean isGeneratedPath(String name) {
@Data
public static class HiveFileStatus {
BlockLocation[] blockLocations;
Path path;
LocationPath path;
long length;
long blockSize;
long modificationTime;
Expand Down
Loading

0 comments on commit 5b55f4b

Please sign in to comment.