diff --git a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2TokenExchange.java b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2TokenExchange.java index 73982230c110..335f7d6b8ea4 100644 --- a/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2TokenExchange.java +++ b/core/trino-main/src/main/java/io/trino/server/security/oauth2/OAuth2TokenExchange.java @@ -91,6 +91,8 @@ public ListenableFuture getTokenPoll(UUID authId) public void dropToken(UUID authId) { + // TODO this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881). + // Determine whether this is OK here. cache.invalidate(hashAuthId(authId)); } diff --git a/core/trino-main/src/main/java/io/trino/type/BlockTypeOperators.java b/core/trino-main/src/main/java/io/trino/type/BlockTypeOperators.java index 4219026c6ec5..dad7c299d231 100644 --- a/core/trino-main/src/main/java/io/trino/type/BlockTypeOperators.java +++ b/core/trino-main/src/main/java/io/trino/type/BlockTypeOperators.java @@ -284,6 +284,8 @@ public long getCacheRequestCount() @Managed public void cacheReset() { + // Note: this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881) + // This is acceptable, since this operation is invoked manually, and not relied upon for correctness. generatedBlockOperatorCache.invalidateAll(); } } diff --git a/core/trino-main/src/main/java/io/trino/type/TypeOperatorsCache.java b/core/trino-main/src/main/java/io/trino/type/TypeOperatorsCache.java index 37fb4585749a..027f3adda002 100644 --- a/core/trino-main/src/main/java/io/trino/type/TypeOperatorsCache.java +++ b/core/trino-main/src/main/java/io/trino/type/TypeOperatorsCache.java @@ -71,6 +71,8 @@ public long getCacheRequestCount() @Managed public void cacheReset() { + // Note: this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881) + // This is acceptable, since this operation is invoked manually, and not relied upon for correctness. cache.invalidateAll(); } } diff --git a/lib/trino-plugin-toolkit/pom.xml b/lib/trino-plugin-toolkit/pom.xml index b044c6efda7e..301f55fabd19 100644 --- a/lib/trino-plugin-toolkit/pom.xml +++ b/lib/trino-plugin-toolkit/pom.xml @@ -28,6 +28,11 @@ bootstrap + + io.airlift + concurrent + + io.airlift configuration diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java new file mode 100644 index 000000000000..5ecaa527b62b --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java @@ -0,0 +1,195 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.cache; + +import com.google.common.cache.AbstractCache; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheStats; +import com.google.common.util.concurrent.SettableFuture; + +import javax.annotation.CheckForNull; + +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static io.airlift.concurrent.MoreFutures.getDone; +import static java.lang.System.nanoTime; +import static java.util.Objects.requireNonNull; + +/** + * A Cache implementation similar to ones produced by {@code com.google.common.cache.CacheBuilder}, + * but one that does not exhibits Guava issue #1881, i.e. + * a {@link #getIfPresent(Object)} after {@link #invalidate(Object)} is guaranteed to return {@code null} and + * {@link #get(Object, Callable)} after {@link #invalidate(Object)} is guaranteed to load a fresh value. + */ +public class EvictableCache + extends AbstractCache + implements Cache +{ + /** + * @apiNote Piggy-back on {@link CacheBuilder} for cache TTL. + */ + public static EvictableCache buildWith(CacheBuilder cacheBuilder) + { + return new EvictableCache<>(cacheBuilder); + } + + // private final Map> map = new ConcurrentHashMap<>(); + private final Cache> delegate; + + private final StatsCounter statsCounter = new SimpleStatsCounter(); + + private EvictableCache(CacheBuilder cacheBuilder) + { + requireNonNull(cacheBuilder, "cacheBuilder is null"); + this.delegate = cacheBuilder.build(); + } + + @CheckForNull + @Override + public V getIfPresent(Object key) + { + Future future = delegate.getIfPresent(key); + if (future != null && future.isDone()) { + statsCounter.recordHits(1); + return getDone(future); + } + statsCounter.recordMisses(1); + return null; + } + + @Override + public V get(K key, Callable loader) + throws ExecutionException + { + requireNonNull(key, "key is null"); + requireNonNull(loader, "loader is null"); + + while (true) { + SettableFuture newFuture = SettableFuture.create(); + Future future = delegate.asMap().computeIfAbsent(key, ignored -> newFuture); + if (future.isDone() && !future.isCancelled()) { + statsCounter.recordHits(1); + return getDone(future); + } + + statsCounter.recordMisses(1); + if (future == newFuture) { + // We put the future in. + + V computed; + long loadStartNanos = nanoTime(); + try { + computed = loader.call(); + requireNonNull(computed, "computed is null"); + } + catch (Exception e) { + statsCounter.recordLoadException(nanoTime() - loadStartNanos); + delegate.asMap().remove(key, newFuture); + // wake up waiters, let them retry + newFuture.cancel(false); + throw new ExecutionException(e); + } + statsCounter.recordLoadSuccess(nanoTime() - loadStartNanos); + newFuture.set(computed); + return computed; + } + + // Someone else is loading the key, let's wait. + try { + return future.get(); + } + catch (CancellationException e) { + // Invalidated, or load failed + } + catch (ExecutionException e) { + // Should never happen + throw new IllegalStateException("Future unexpectedly completed with exception", e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted", e); + } + + // Someone else was loading the key, but the load was invalidated. + } + } + + @Override + public void put(K key, V value) + { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + public void invalidate(Object key) + { + delegate.invalidate(key); + } + + @Override + public void invalidateAll(Iterable keys) + { + delegate.invalidateAll(keys); + } + + @Override + public void invalidateAll() + { + delegate.invalidateAll(); + } + + @Override + public long size() + { + // Includes entries being computed. Approximate, as allowed per method contract. + return delegate.size(); + } + + @Override + public CacheStats stats() + { + return statsCounter.snapshot().plus( + new CacheStats( + 0, + 0, + 0, + 0, + 0, + delegate.stats().evictionCount())); + } + + @Override + public ConcurrentMap asMap() + { + // TODO implement and remove non-interface keySet() + throw new UnsupportedOperationException(); + } + + public Set keySet() + { + return delegate.asMap().keySet(); + } + + @Override + public void cleanUp() + { + delegate.cleanUp(); + } +} diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java new file mode 100644 index 000000000000..b8c5df9557ef --- /dev/null +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java @@ -0,0 +1,201 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheStats; +import io.airlift.concurrent.MoreFutures; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static java.lang.String.format; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class TestEvictableCache +{ + private static final int TEST_TIMEOUT_MILLIS = 10_000; + + private enum Invalidation + { + INVALIDATE_KEY, + INVALIDATE_PREDEFINED_KEYS, + INVALIDATE_SELECTED_KEYS, + INVALIDATE_ALL, + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testLoad() + throws Exception + { + Cache cache = EvictableCache.buildWith( + CacheBuilder.newBuilder()); + + assertEquals(cache.stats(), new CacheStats(0, 0, 0, 0, 0, 0)); + String value = cache.get(42, () -> "abc"); + assertEquals(cache.stats(), new CacheStats(0, 1, 1, 0, cache.stats().totalLoadTime(), 0)); + assertEquals(value, "abc"); + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "invalidations") + public void testInvalidateOngoingLoad(Invalidation invalidation) + throws Exception + { + Cache cache = EvictableCache.buildWith(CacheBuilder.newBuilder()); + Integer key = 42; + + CountDownLatch loadOngoing = new CountDownLatch(1); + CountDownLatch invalidated = new CountDownLatch(1); + ExecutorService executor = newFixedThreadPool(2); + try { + // thread A + Future threadA = executor.submit(() -> { + return cache.get(key, () -> { + loadOngoing.countDown(); // 1 + assertTrue(invalidated.await(10, SECONDS)); // 2 + return "stale value"; + }); + }); + + // thread B + Future threadB = executor.submit(() -> { + assertTrue(loadOngoing.await(10, SECONDS)); // 1 + + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(List.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set keys = ((EvictableCache) cache).keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + } + + invalidated.countDown(); // 2 + + return cache.get(key, () -> "fresh value"); + }); + + assertEquals(threadA.get(), "stale value"); + assertEquals(threadB.get(), "fresh value"); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test(invocationCount = 10, timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "invalidations") + public void testInvalidateAndLoadConcurrently(Invalidation invalidation) + throws Exception + { + int[] primes = {2, 3, 5, 7}; + AtomicLong remoteState = new AtomicLong(1); + + Cache cache = EvictableCache.buildWith(CacheBuilder.newBuilder()); + Integer key = 42; + int threads = 4; + + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + try { + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + // prime the cache + assertEquals((long) cache.get(key, remoteState::get), 1L); + int prime = primes[threadNumber]; + + barrier.await(10, SECONDS); + + // modify underlying state + remoteState.updateAndGet(current -> current * prime); + + // invalidate + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(List.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set keys = ((EvictableCache) cache).keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + } + + // read through cache + long current = cache.get(key, remoteState::get); + if (current % prime != 0) { + fail(format("The value read through cache (%s) in thread (%s) is not divisable by (%s)", current, threadNumber, prime)); + } + + return (Void) null; + })) + .collect(toImmutableList()); + + futures.forEach(MoreFutures::getFutureValue); + + assertEquals(remoteState.get(), 2 * 3 * 5 * 7); + assertEquals((long) cache.get(key, remoteState::get), remoteState.get()); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + + @DataProvider + public static Object[][] invalidations() + { + return Stream.of(Invalidation.values()) + .map(invalidation -> new Object[] {invalidation}) + .toArray(Object[][]::new); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index 73c33845e46c..f5aeeb4722a4 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.jmx.CacheStatsMBean; import io.airlift.units.Duration; +import io.trino.plugin.base.cache.EvictableCache; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.jdbc.IdentityCacheMapping.IdentityCacheKey; import io.trino.spi.TrinoException; @@ -121,11 +122,11 @@ public CachingJdbcClient( cacheBuilder.maximumSize(cacheMaximumSize); } - schemaNamesCache = cacheBuilder.build(); - tableNamesCache = cacheBuilder.build(); - tableHandleCache = cacheBuilder.build(); - columnsCache = cacheBuilder.build(); - statisticsCache = cacheBuilder.build(); + schemaNamesCache = EvictableCache.buildWith(cacheBuilder); + tableNamesCache = EvictableCache.buildWith(cacheBuilder); + tableHandleCache = EvictableCache.buildWith(cacheBuilder); + columnsCache = EvictableCache.buildWith(cacheBuilder); + statisticsCache = EvictableCache.buildWith(cacheBuilder); } @Override @@ -272,13 +273,12 @@ public Optional getTableHandle(ConnectorSession session, Schema Optional cachedTableHandle = tableHandleCache.getIfPresent(key); //noinspection OptionalAssignedToNull if (cachedTableHandle != null) { - return cachedTableHandle; - } - Optional tableHandle = delegate.getTableHandle(session, schemaTableName); - if (tableHandle.isPresent() || cacheMissing) { - tableHandleCache.put(key, tableHandle); + if (cacheMissing || cachedTableHandle.isPresent()) { + return cachedTableHandle; + } + tableHandleCache.invalidate(key); } - return tableHandle; + return get(tableHandleCache, key, () -> delegate.getTableHandle(session, schemaTableName)); } @Override @@ -341,14 +341,12 @@ public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHan TableStatistics cachedStatistics = statisticsCache.getIfPresent(key); if (cachedStatistics != null) { - return cachedStatistics; - } - - TableStatistics statistics = delegate.getTableStatistics(session, handle, tupleDomain); - if (!statistics.equals(TableStatistics.empty()) || cacheMissing) { - statisticsCache.put(key, statistics); + if (cacheMissing || !cachedStatistics.equals(TableStatistics.empty())) { + return cachedStatistics; + } + statisticsCache.invalidate(key); } - return statistics; + return get(statisticsCache, key, () -> delegate.getTableStatistics(session, handle, tupleDomain)); } @Override @@ -547,7 +545,7 @@ CacheStats getStatisticsCacheStats() private static void invalidateCache(Cache cache, Predicate filterFunction) { - Set cacheKeys = cache.asMap().keySet().stream() + Set cacheKeys = ((EvictableCache) cache).keySet().stream() .filter(filterFunction) .collect(toImmutableSet()); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/mapping/CachingIdentifierMapping.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/mapping/CachingIdentifierMapping.java index 68ea4f4897a4..da6caaa579b0 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/mapping/CachingIdentifierMapping.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/mapping/CachingIdentifierMapping.java @@ -73,6 +73,8 @@ public CachingIdentifierMapping( public void flushCache() { + // Note: this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881) + // This is acceptable, since this operation is invoked manually, and not relied upon for correctness. remoteSchemaNames.invalidateAll(); remoteTableNames.invalidateAll(); } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java index a9f3db3f191e..4e49c8cd5d4b 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java @@ -16,6 +16,7 @@ import com.google.common.cache.CacheStats; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; import io.airlift.units.Duration; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.jdbc.credential.ExtraCredentialConfig; @@ -35,18 +36,27 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.security.SecureRandom; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.function.Supplier; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.testing.TestingConnectorSession.builder; +import static java.lang.Character.MAX_RADIX; +import static java.lang.Math.abs; +import static java.lang.Math.min; import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -55,6 +65,10 @@ @Test(singleThreaded = true) public class TestCachingJdbcClient { + private static final SecureRandom random = new SecureRandom(); + // The suffix needs to be long enough to "prevent" collisions in practice. The length of 5 was proven not to be long enough + private static final int RANDOM_SUFFIX_LENGTH = 10; + private static final Duration FOREVER = Duration.succinctDuration(1, DAYS); private static final Duration ZERO = Duration.succinctDuration(0, MILLISECONDS); @@ -79,6 +93,7 @@ public class TestCachingJdbcClient private CachingJdbcClient cachingJdbcClient; private JdbcClient jdbcClient; private String schema; + private ExecutorService executor; @BeforeMethod public void setUp() @@ -88,6 +103,7 @@ public void setUp() cachingJdbcClient = createCachingJdbcClient(true, 10000); jdbcClient = database.getJdbcClient(); schema = jdbcClient.getSchemaNames(SESSION).iterator().next(); + executor = newCachedThreadPool(daemonThreadsNamed("TestCachingJdbcClient-%s")); } private CachingJdbcClient createCachingJdbcClient(Duration cacheTtl, boolean cacheMissing, long cacheMaximumSize) @@ -104,6 +120,7 @@ private CachingJdbcClient createCachingJdbcClient(boolean cacheMissing, long cac public void tearDown() throws Exception { + executor.shutdownNow(); database.close(); } @@ -342,7 +359,7 @@ public void testGetTableStatistics() JdbcTableHandle second = createTable(new SchemaTableName(schema, "second")); // load first - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, first, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -352,7 +369,7 @@ public void testGetTableStatistics() }); // load second - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, second, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -366,7 +383,7 @@ public void testGetTableStatistics() JdbcTableHandle secondFirst = createTable(new SchemaTableName(schema, "first")); // load first again - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, secondFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -398,7 +415,7 @@ public void testCacheGetTableStatisticsWithQueryRelationHandle() 0); // load - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, queryOnFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -419,7 +436,7 @@ public void testCacheGetTableStatisticsWithQueryRelationHandle() cachingJdbcClient.dropTable(SESSION, first); // load again - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, queryOnFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); } @@ -433,7 +450,7 @@ public void testTruncateTable() JdbcTableHandle table = createTable(new SchemaTableName(schema, "table")); // load - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -446,7 +463,7 @@ public void testTruncateTable() cachingJdbcClient.truncateTable(SESSION, table); // load again - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -486,7 +503,7 @@ public void testCacheEmptyStatistics() ConnectorSession session = createSession("table"); JdbcTableHandle table = createTable(new SchemaTableName(schema, "table")); - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(TableStatistics.empty()); }); @@ -505,11 +522,11 @@ public void testGetTableStatisticsDoNotCacheEmptyWhenCachingMissingIsDisabled() ConnectorSession session = createSession("table"); JdbcTableHandle table = createTable(new SchemaTableName(schema, "table")); - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(TableStatistics.empty()); }); - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).hits(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(TableStatistics.empty()); }); @@ -557,7 +574,7 @@ public void testFlushCache() JdbcTableHandle first = createTable(new SchemaTableName(schema, "atable")); // load table - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, first, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -571,7 +588,7 @@ public void testFlushCache() JdbcTableHandle secondFirst = createTable(new SchemaTableName(schema, "first")); // load table again - assertStatisticsCacheStats(cachingJdbcClient).misses(1).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, secondFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -584,6 +601,27 @@ public void testFlushCache() jdbcClient.dropTable(SESSION, first); } + @Test(timeOut = 60_000) + public void testConcurrentSchemaCreateAndDrop() + { + CachingJdbcClient cachingJdbcClient = cachingStatisticsAwareJdbcClient(FOREVER, true, 10000); + ConnectorSession session = createSession("asession"); + List> futures = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + futures.add(executor.submit(() -> { + String schemaName = "schema_" + randomSuffix(); + assertThat(cachingJdbcClient.getSchemaNames(session)).doesNotContain(schemaName); + cachingJdbcClient.createSchema(session, schemaName); + assertThat(cachingJdbcClient.getSchemaNames(session)).contains(schemaName); + cachingJdbcClient.dropSchema(session, schemaName); + assertThat(cachingJdbcClient.getSchemaNames(session)).doesNotContain(schemaName); + return null; + })); + } + + futures.forEach(Futures::getUnchecked); + } + private JdbcTableHandle getAnyTable(String schema) { SchemaTableName tableName = jdbcClient.getTableNames(SESSION, Optional.of(schema)) @@ -700,4 +738,10 @@ public void afterRunning(Runnable runnable) .isEqualTo(expectedMisses); } } + + private static String randomSuffix() + { + String randomSuffix = Long.toString(abs(random.nextLong()), MAX_RADIX); + return randomSuffix.substring(0, min(RANDOM_SUFFIX_LENGTH, randomSuffix.length())); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java index 06649dfed042..8983f44467ff 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java @@ -144,6 +144,8 @@ public LocatedFileStatus next() @Managed public void flushCache() { + // Note: this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881) + // This is acceptable, since this operation is invoked manually, and not relied upon for correctness. cache.invalidateAll(); } diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java index f251bea6cb25..c5de9fe2a8fc 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java @@ -14,16 +14,14 @@ package io.trino.plugin.mongodb; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Primitives; import com.google.common.primitives.Shorts; import com.google.common.primitives.SignedBytes; -import com.google.common.util.concurrent.UncheckedExecutionException; import com.mongodb.DBRef; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoClient; @@ -34,6 +32,7 @@ import com.mongodb.client.result.DeleteResult; import io.airlift.log.Logger; import io.airlift.slice.Slice; +import io.trino.plugin.base.cache.EvictableCache; import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; @@ -64,6 +63,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -86,7 +86,6 @@ import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; @@ -123,7 +122,7 @@ public class MongoSession private final boolean caseInsensitiveNameMatching; private final int cursorBatchSize; - private final LoadingCache tableCache; + private final Cache tableCache; private final String implicitPrefix; public MongoSession(TypeManager typeManager, MongoClient client, MongoClientConfig config) @@ -135,10 +134,8 @@ public MongoSession(TypeManager typeManager, MongoClient client, MongoClientConf this.cursorBatchSize = config.getCursorBatchSize(); this.implicitPrefix = requireNonNull(config.getImplicitRowFieldPrefix(), "config.getImplicitRowFieldPrefix() is null"); - this.tableCache = CacheBuilder.newBuilder() - .expireAfterWrite(1, HOURS) // TODO: Configure - .refreshAfterWrite(1, MINUTES) - .build(CacheLoader.from(this::loadTableSchema)); + this.tableCache = EvictableCache.buildWith(CacheBuilder.newBuilder() + .expireAfterWrite(1, MINUTES)); // TODO: Configure } public void shutdown() @@ -179,11 +176,11 @@ public MongoTable getTable(SchemaTableName tableName) throws TableNotFoundException { try { - return tableCache.getUnchecked(tableName); + return tableCache.get(tableName, () -> loadTableSchema(tableName)); } - catch (UncheckedExecutionException e) { + catch (ExecutionException e) { throwIfInstanceOf(e.getCause(), TrinoException.class); - throw e; + throw new RuntimeException(e); } } diff --git a/plugin/trino-password-authenticators/src/main/java/io/trino/plugin/password/ldap/LdapAuthenticator.java b/plugin/trino-password-authenticators/src/main/java/io/trino/plugin/password/ldap/LdapAuthenticator.java index 31dfc569e89c..12b563c43a40 100644 --- a/plugin/trino-password-authenticators/src/main/java/io/trino/plugin/password/ldap/LdapAuthenticator.java +++ b/plugin/trino-password-authenticators/src/main/java/io/trino/plugin/password/ldap/LdapAuthenticator.java @@ -92,6 +92,8 @@ public LdapAuthenticator(LdapAuthenticatorClient client, LdapConfig ldapConfig) @VisibleForTesting void invalidateCache() { + // Note: this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881) + // This is acceptable, since this operation is invoked in tests only, and not relied upon for correctness. authenticationCache.invalidateAll(); } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java index 538282134b4a..0a09b31068d2 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.java @@ -427,6 +427,8 @@ public ListenableFuture load(MissingShard missingShard) missingShard.getShardXxhash64(), missingShard.isActive()); ListenableFuture future = shardRecoveryExecutor.submit(task); + // TODO this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881). + // Determine whether this is OK here. future.addListener(() -> queuedMissingShards.invalidate(missingShard), directExecutor()); return future; }