Skip to content

Commit

Permalink
Use data size for directory listing cache
Browse files Browse the repository at this point in the history
  • Loading branch information
NikhilCollooru committed Jul 13, 2024
1 parent 56bfe64 commit 0da1bea
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 25 deletions.
2 changes: 2 additions & 0 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ Property Name Description

``hive.skip-empty-files`` Enable skipping empty files. Otherwise, it will produce an ``false``
error iterating through empty files.

``hive.file-status-cache.max-retained-size`` Maximum size in bytes of the directory listing cache ``0KB``
================================================== ============================================================ ============

Metastore Configuration Properties
Expand Down
6 changes: 6 additions & 0 deletions presto-hdfs-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
<artifactId>drift-api</artifactId>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<scope>provided</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>org.testng</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.google.common.collect.ImmutableList;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;

Expand All @@ -30,6 +31,8 @@
@ThriftStruct
public class BlockLocation
{
private static final long INSTANCE_SIZE = ClassLayout.parseClass(BlockLocation.class).instanceSize();

private final List<String> hosts;
private final long offset;
private final long length;
Expand Down Expand Up @@ -83,6 +86,11 @@ public long getLength()
return length;
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + hosts.stream().mapToLong(String::length).reduce(0, Long::sum);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.openjdk.jol.info.ClassLayout;

import java.io.IOException;
import java.util.List;
Expand All @@ -34,6 +35,8 @@
public class HiveFileInfo
implements Comparable
{
private static final long INSTANCE_SIZE = ClassLayout.parseClass(HiveFileInfo.class).instanceSize();

private final String path;
private final boolean isDirectory;
private final List<BlockLocation> blockLocations;
Expand Down Expand Up @@ -130,6 +133,14 @@ public Path getPath()
return new Path(path);
}

public long getRetainedSizeInBytes()
{
long blockLocationsSizeInBytes = blockLocations.stream().map(BlockLocation::getRetainedSizeInBytes).reduce(0L, Long::sum);
long extraFileInfoSizeInBytes = extraFileInfo.map(bytes -> bytes.length).orElse(0);
long customSplitInfoSizeInBytes = customSplitInfo.entrySet().stream().mapToLong(e -> e.getKey().length() + e.getValue().length()).reduce(0, Long::sum);
return INSTANCE_SIZE + path.length() + blockLocationsSizeInBytes + extraFileInfoSizeInBytes + customSplitInfoSizeInBytes;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import com.google.common.cache.Weigher;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.Path;
import org.openjdk.jol.info.ClassLayout;
import org.weakref.jmx.Managed;

import javax.inject.Inject;
Expand All @@ -46,12 +48,13 @@
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class CachingDirectoryLister
implements DirectoryLister
{
private final Cache<String, List<HiveFileInfo>> cache;
private final Cache<String, ValueHolder> cache;
private final CachedTableChecker cachedTableChecker;
private final DirectoryLister delegate;

Expand All @@ -61,16 +64,16 @@ public CachingDirectoryLister(@ForCachingDirectoryLister DirectoryLister delegat
this(
delegate,
hiveClientConfig.getFileStatusCacheExpireAfterWrite(),
hiveClientConfig.getFileStatusCacheMaxSize(),
hiveClientConfig.getFileStatusCacheMaxRetainedSize(),
hiveClientConfig.getFileStatusCacheTables());
}

public CachingDirectoryLister(DirectoryLister delegate, Duration expireAfterWrite, long maxSize, List<String> tables)
public CachingDirectoryLister(DirectoryLister delegate, Duration expireAfterWrite, DataSize maxSize, List<String> tables)
{
this.delegate = requireNonNull(delegate, "delegate is null");
cache = CacheBuilder.newBuilder()
.maximumWeight(maxSize)
.weigher((Weigher<String, List<HiveFileInfo>>) (key, value) -> value.size())
.maximumWeight(maxSize.toBytes())
.weigher((Weigher<String, ValueHolder>) (key, value) -> toIntExact(key.length() + value.getRetainedSizeInBytes()))
.expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS)
.recordStats()
.build();
Expand All @@ -91,8 +94,9 @@ public Iterator<HiveFileInfo> list(
if (hiveDirectoryContext.isCacheable()) {
// DO NOT USE Caching, when cache is disabled.
// This is useful for debugging issues, when cache is explicitly disabled via session property.
List<HiveFileInfo> files = cache.getIfPresent(path.toString());
if (files != null) {
ValueHolder value = Optional.ofNullable(cache.getIfPresent(path.toString())).orElse(null);
if (value != null) {
List<HiveFileInfo> files = value.getFiles();
runtimeStats.addMetricValue(DIRECTORY_LISTING_CACHE_HIT, NONE, 1);
runtimeStats.addMetricValue(DIRECTORY_LISTING_TIME_NANOS, NANO, System.nanoTime() - startTime);
runtimeStats.addMetricValue(FILES_READ_COUNT, NONE, files.size());
Expand Down Expand Up @@ -122,7 +126,7 @@ public boolean hasNext()
if (!hasNext) {
runtimeStats.addMetricValue(FILES_READ_COUNT, NONE, files.size());
if (enableCaching) {
cache.put(path.toString(), ImmutableList.copyOf(files));
cache.put(path.toString(), new ValueHolder(files));
}
}
return hasNext;
Expand All @@ -145,8 +149,8 @@ public void invalidateDirectoryListCache(Optional<String> directoryPath)
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Directory path can not be a empty string");
}

List<HiveFileInfo> files = cache.getIfPresent(directoryPath.get());
if (files == null) {
ValueHolder value = cache.getIfPresent(directoryPath.get());
if (value == null) {
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Given directory path is not cached : " + directoryPath);
}
cache.invalidate(directoryPath.get());
Expand Down Expand Up @@ -204,6 +208,28 @@ public long getSize()
return cache.size();
}

private static class ValueHolder
{
private static final long INSTANCE_SIZE = ClassLayout.parseClass(ValueHolder.class).instanceSize();

private final List<HiveFileInfo> files;

public ValueHolder(List<HiveFileInfo> files)
{
this.files = ImmutableList.copyOf(requireNonNull(files, "files is null"));
}

public List<HiveFileInfo> getFiles()
{
return files;
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + files.stream().map(HiveFileInfo::getRetainedSizeInBytes).reduce(0L, Long::sum);
}
}

private static class CachedTableChecker
{
private final Set<SchemaTableName> cachedTableNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -163,7 +164,7 @@ public class HiveClientConfig
private boolean parquetPushdownFilterEnabled;
private boolean adaptiveFilterReorderingEnabled = true;
private Duration fileStatusCacheExpireAfterWrite = new Duration(0, TimeUnit.SECONDS);
private long fileStatusCacheMaxSize;
private DataSize fileStatusCacheMaxRetainedSize = new DataSize(0, KILOBYTE);
private List<String> fileStatusCacheTables = ImmutableList.of();

private DataSize pageFileStripeMaxSize = new DataSize(24, MEGABYTE);
Expand Down Expand Up @@ -857,15 +858,15 @@ public HiveClientConfig setFileStatusCacheTables(String fileStatusCacheTables)
return this;
}

public long getFileStatusCacheMaxSize()
public DataSize getFileStatusCacheMaxRetainedSize()
{
return fileStatusCacheMaxSize;
return fileStatusCacheMaxRetainedSize;
}

@Config("hive.file-status-cache-size")
public HiveClientConfig setFileStatusCacheMaxSize(long fileStatusCacheMaxSize)
@Config("hive.file-status-cache.max-retained-size")
public HiveClientConfig setFileStatusCacheMaxRetainedSize(DataSize fileStatusCacheMaxRetainedSize)
{
this.fileStatusCacheMaxSize = fileStatusCacheMaxSize;
this.fileStatusCacheMaxRetainedSize = fileStatusCacheMaxRetainedSize;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -284,21 +285,21 @@ public void testCachingDirectoryLister()
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
1000,
new DataSize(100, KILOBYTE),
ImmutableList.of("test_dbname.test_table")),
"test_dbname.test_table");
testCachingDirectoryLister(
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
1000,
new DataSize(100, KILOBYTE),
ImmutableList.of("*")),
"*");
testCachingDirectoryLister(
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
1000,
new DataSize(100, KILOBYTE),
ImmutableList.of("*")),
"");
assertThrows(
Expand All @@ -307,7 +308,7 @@ public void testCachingDirectoryLister()
new CachingDirectoryLister(
new HadoopDirectoryLister(),
new Duration(5, TimeUnit.MINUTES),
1000,
new DataSize(100, KILOBYTE),
ImmutableList.of("*", "test_dbname.test_table")),
"*,test_dbname.test_table"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.TestHiveUtil.nonDefaultTimeZone;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class TestHiveClientConfig
Expand Down Expand Up @@ -123,7 +124,7 @@ public void testDefaults()
.setParquetPushdownFilterEnabled(false)
.setAdaptiveFilterReorderingEnabled(true)
.setFileStatusCacheExpireAfterWrite(new Duration(0, TimeUnit.SECONDS))
.setFileStatusCacheMaxSize(0)
.setFileStatusCacheMaxRetainedSize(new DataSize(0, KILOBYTE))
.setFileStatusCacheTables("")
.setPageFileStripeMaxSize(new DataSize(24, Unit.MEGABYTE))
.setBucketFunctionTypeForExchange(HIVE_COMPATIBLE)
Expand Down Expand Up @@ -251,7 +252,7 @@ public void testExplicitPropertyMappings()
.put("hive.parquet.pushdown-filter-enabled", "true")
.put("hive.adaptive-filter-reordering-enabled", "false")
.put("hive.file-status-cache-tables", "foo.bar1, foo.bar2")
.put("hive.file-status-cache-size", "1000")
.put("hive.file-status-cache.max-retained-size", "500MB")
.put("hive.file-status-cache-expire-time", "30m")
.put("hive.pagefile.writer.stripe-max-size", "1kB")
.put("hive.bucket-function-type-for-exchange", "PRESTO_NATIVE")
Expand Down Expand Up @@ -375,7 +376,7 @@ public void testExplicitPropertyMappings()
.setParquetPushdownFilterEnabled(true)
.setAdaptiveFilterReorderingEnabled(false)
.setFileStatusCacheTables("foo.bar1,foo.bar2")
.setFileStatusCacheMaxSize(1000)
.setFileStatusCacheMaxRetainedSize((new DataSize(500, MEGABYTE)))
.setFileStatusCacheExpireAfterWrite(new Duration(30, TimeUnit.MINUTES))
.setPageFileStripeMaxSize(new DataSize(1, Unit.KILOBYTE))
.setBucketFunctionTypeForExchange(PRESTO_NATIVE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ hive.collect-column-statistics-on-write=true

# List file cache
hive.file-status-cache-expire-time=24h
hive.file-status-cache-size=100000000
hive.file-status-cache.max-retained-size=100kB
hive.file-status-cache-tables=*
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ hive.collect-column-statistics-on-write=true

# List file cache
hive.file-status-cache-expire-time=24h
hive.file-status-cache-size=100000000
hive.file-status-cache.max-retained-size=100kB
hive.file-status-cache-tables=*

0 comments on commit 0da1bea

Please sign in to comment.