Skip to content

Commit

Permalink
Cleanup extranous output files in Iceberg DML
Browse files Browse the repository at this point in the history
With query/task retries there is a chance that extra files, which
does not make it to the snapshot are written to tables directory.
While most of such cases should be cleaned up by writers on workers,
there is a slim channce that some of those will survive query exection
(e.g. if worker machine is killed).

This commit adds pre-commit routine on coordinator which deletes what
remained. This is still opportunistic and not 100% sure to delete
everything as extra files may still be written after cleanup routine
already completed, but we are trying our best. The remaining files does
not imply query correctness.
  • Loading branch information
losipiuk committed Feb 11, 2022
1 parent 894c7ad commit 955d243
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.HiveApplyProjectionUtil;
import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveWrittenPartitions;
Expand Down Expand Up @@ -69,6 +72,10 @@
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
Expand All @@ -89,10 +96,13 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -102,6 +112,8 @@
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -134,7 +146,9 @@
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand All @@ -143,9 +157,14 @@
public class IcebergMetadata
implements ConnectorMetadata
{
private static final Logger log = Logger.get(IcebergMetadata.class);

private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+");

private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalog catalog;
private final HdfsEnvironment hdfsEnvironment;

private final Map<String, Long> snapshotIds = new ConcurrentHashMap<>();

Expand All @@ -154,11 +173,13 @@ public class IcebergMetadata
public IcebergMetadata(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalog catalog)
TrinoCatalog catalog,
HdfsEnvironment hdfsEnvironment)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalog = requireNonNull(catalog, "catalog is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
}

@Override
Expand Down Expand Up @@ -449,7 +470,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
getColumns(transaction.table().schema(), typeManager),
transaction.table().location(),
getFileFormat(transaction.table()),
transaction.table().properties());
transaction.table().properties(),
retryMode);
}

@Override
Expand Down Expand Up @@ -509,7 +531,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
getColumns(icebergTable.schema(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
icebergTable.properties());
icebergTable.properties(),
retryMode);
}

@Override
Expand All @@ -528,6 +551,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
.toArray(Type[]::new);

AppendFiles appendFiles = transaction.newAppend();
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
Expand All @@ -542,6 +566,12 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
}

appendFiles.appendFile(builder.build());
writtenFiles.add(task.getPath());
}

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

appendFiles.commit();
Expand All @@ -553,6 +583,72 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
.collect(toImmutableList())));
}

private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, String location, Set<String> filesToKeep)
{
Deque<String> filesToDelete = new ArrayDeque<>();
try {
log.debug("Deleting failed attempt files from %s for query %s", location, queryId);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, new Path(location));
if (!fileSystem.exists(new Path(location))) {
// directory may not exist if no files were actually written
return;
}

// files within given partition are written flat into location; we need to list recursively
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path(location), false);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(location + "/" + file.getName())) {
filesToDelete.add(file.getName());
}
}

if (filesToDelete.isEmpty()) {
return;
}

log.info("Found %s files to delete and %s to retain in location %s for query %s", filesToDelete.size(), filesToKeep.size(), location, queryId);
ImmutableList.Builder<String> deletedFilesBuilder = ImmutableList.builder();
Iterator<String> filesToDeleteIterator = filesToDelete.iterator();
while (filesToDeleteIterator.hasNext()) {
String fileName = filesToDeleteIterator.next();
log.debug("Deleting failed attempt file %s/%s for query %s", location, fileName, queryId);
fileSystem.delete(new Path(location, fileName), false);
deletedFilesBuilder.add(fileName);
filesToDeleteIterator.remove();
}

List<String> deletedFiles = deletedFilesBuilder.build();
if (!deletedFiles.isEmpty()) {
log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, location, queryId);
}
}
catch (IOException e) {
throw new TrinoException(IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR,
format("Could not clean up extraneous output files; remaining files: %s", filesToDelete), e);
}
}

private boolean isFileCreatedByQuery(String fileName, String queryId)
{
verify(!queryId.contains("-"), "queryId(%s) should not contain hyphens", queryId);
return fileName.startsWith(queryId + "-");
}

private static Set<String> getOutputFilesLocations(Set<String> writtenFiles)
{
return writtenFiles.stream()
.map(IcebergMetadata::getLocation)
.collect(toImmutableSet());
}

private static String getLocation(String path)
{
Matcher matcher = PATH_PATTERN.matcher(path);
verify(matcher.matches(), "path %s does not match pattern", path);
return matcher.group(1);
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand All @@ -579,13 +675,13 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(

switch (procedureId) {
case OPTIMIZE:
return getTableHandleForOptimize(session, tableHandle, executeProperties);
return getTableHandleForOptimize(session, tableHandle, executeProperties, retryMode);
}

throw new IllegalArgumentException("Unknown procedure: " + procedureId);
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties, RetryMode retryMode)
{
DataSize maxScannedFileSize = (DataSize) executeProperties.get("file_size_threshold");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
Expand All @@ -599,7 +695,8 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(Connecto
getColumns(icebergTable.schema(), typeManager),
getFileFormat(icebergTable),
icebergTable.properties(),
maxScannedFileSize),
maxScannedFileSize,
retryMode != NO_RETRIES),
icebergTable.location()));
}

Expand Down Expand Up @@ -659,13 +756,13 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
finishOptimize(executeHandle, fragments, splitSourceInfo);
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
return;
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
}

private void finishOptimize(IcebergTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
{
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = transaction.table();
Expand Down Expand Up @@ -707,6 +804,14 @@ private void finishOptimize(IcebergTableExecuteHandle executeHandle, Collection<
return;
}

// try to leave as little garbage as possible behind
if (optimizeHandle.isRetriesEnabled()) {
cleanExtraOutputFiles(
session,
newFiles.stream()
.map(dataFile -> dataFile.path().toString())
.collect(toImmutableSet()));
}
RewriteFiles rewriteFiles = transaction.newRewrite();
rewriteFiles.rewriteFiles(scannedFiles, newFiles);
rewriteFiles.commit();
Expand Down Expand Up @@ -1084,7 +1189,8 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession
getColumns(icebergTable.schema(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
icebergTable.properties());
icebergTable.properties(),
retryMode);
}

@Override
Expand Down Expand Up @@ -1112,6 +1218,7 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
.toArray(Type[]::new);

AppendFiles appendFiles = transaction.newFastAppend();
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
Expand All @@ -1126,6 +1233,7 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
}

appendFiles.appendFile(builder.build());
writtenFiles.add(task.getPath());
}

String dependencies = sourceTableHandles.stream()
Expand All @@ -1135,6 +1243,11 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
.distinct()
.collect(joining(","));

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

// Update the 'dependsOnTables' property that tracks tables on which the materialized view depends and the corresponding snapshot ids of the tables
appendFiles.set(DEPENDS_ON_TABLES, dependencies);
appendFiles.commit();
Expand All @@ -1146,6 +1259,15 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
.collect(toImmutableList())));
}

private void cleanExtraOutputFiles(ConnectorSession session, Set<String> writtenFiles)
{
HdfsContext hdfsContext = new HdfsContext(session);
Set<String> locations = getOutputFilesLocations(writtenFiles);
for (String location : locations) {
cleanExtraOutputFiles(hdfsContext, session.getQueryId(), location, writtenFiles);
}
}

@Override
public List<SchemaTableName> listMaterializedViews(ConnectorSession session, Optional<String> schemaName)
{
Expand Down Expand Up @@ -1202,7 +1324,7 @@ public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession s
strings = strings.subList(1, 3);
}
else if (strings.size() != 2) {
throw new TrinoException(ICEBERG_INVALID_METADATA, String.format("Invalid table name in '%s' property: %s'", DEPENDS_ON_TABLES, strings));
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid table name in '%s' property: %s'", DEPENDS_ON_TABLES, strings));
}
String schema = strings.get(0);
String name = strings.get(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;

Expand All @@ -26,20 +27,23 @@ public class IcebergMetadataFactory
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalogFactory catalogFactory;
private final HdfsEnvironment hdfsEnvironment;

@Inject
public IcebergMetadataFactory(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalogFactory catalogFactory)
TrinoCatalogFactory catalogFactory,
HdfsEnvironment hdfsEnvironment)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
}

public IcebergMetadata create(ConnectorIdentity identity)
{
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create(identity));
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create(identity), hdfsEnvironment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ private void closeWriter(WriteContext writeContext)

private WriteContext createWriter(Optional<PartitionData> partitionData)
{
String fileName = fileFormat.toIceberg().addExtension(randomUUID().toString());
// prepend query id to a file name so we can determine which files were written by which query. This is needed for opportunistic cleanup of extra files
// which may be present for successfully completing query in presence of failure recovery mechanisms.
String fileName = fileFormat.toIceberg().addExtension(session.getQueryId() + "-" + randomUUID());
Path outputPath = partitionData.map(partition -> new Path(locationProvider.newDataLocation(partitionSpec, partition, fileName)))
.orElse(new Path(locationProvider.newDataLocation(fileName)));

Expand Down
Loading

0 comments on commit 955d243

Please sign in to comment.