Skip to content

Commit

Permalink
OAK-11072: (fix) flaky test: ElasticReliabilityTest.connectionCutOnQu…
Browse files Browse the repository at this point in the history
…ery (apache#1710)

* OAK-11072: (fix) flaky test: ElasticReliabilityTest.connectionCutOnQuery

* OAK-11072: refactor ElasticIndexStatistics to allow cache properties changes when running it in a test suite
  • Loading branch information
fabriziofortino authored Sep 12, 2024
1 parent 0eb6c30 commit 58bde0a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@
* </ul>
*/
public class ElasticIndexStatistics implements IndexStatistics {
private static final Long MAX_SIZE = Long.getLong("oak.elastic.statsMaxSize", 10000);
private static final Long EXPIRE_SECONDS = Long.getLong("oak.elastic.statsExpireSeconds", 10 * 60);
private static final Long REFRESH_SECONDS = Long.getLong("oak.elastic.statsRefreshSeconds", 60);

private static final LoadingCache<StatsRequestDescriptor, Integer> DEFAULT_COUNT_CACHE =
setupCountCache(MAX_SIZE, EXPIRE_SECONDS, REFRESH_SECONDS, null);

private static final LoadingCache<StatsRequestDescriptor, StatsResponse> STATS_CACHE =
setupCache(MAX_SIZE, EXPIRE_SECONDS, REFRESH_SECONDS, new StatsCacheLoader(), null);
private static final String MAX_SIZE = "oak.elastic.statsMaxSize";
private static final Long MAX_SIZE_DEFAULT = 10000L;
private static final String EXPIRE_SECONDS = "oak.elastic.statsExpireSeconds";
private static final Long EXPIRE_SECONDS_DEFAULT = 10 * 60L;
private static final String REFRESH_SECONDS = "oak.elastic.statsRefreshSeconds";
private static final Long REFRESH_SECONDS_DEFAULT = 60L;

private static final ExecutorService REFRESH_EXECUTOR = new ThreadPoolExecutor(
0, 4, 60L, TimeUnit.SECONDS,
Expand All @@ -78,19 +76,25 @@ public class ElasticIndexStatistics implements IndexStatistics {
private final ElasticConnection elasticConnection;
private final ElasticIndexDefinition indexDefinition;
private final LoadingCache<StatsRequestDescriptor, Integer> countCache;
private final LoadingCache<StatsRequestDescriptor, StatsResponse> statsCache;

ElasticIndexStatistics(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition) {
this(elasticConnection, indexDefinition, DEFAULT_COUNT_CACHE);
this(elasticConnection, indexDefinition, null, null);
}

@TestOnly
ElasticIndexStatistics(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition,
@NotNull LoadingCache<StatsRequestDescriptor, Integer> countCache) {
@Nullable LoadingCache<StatsRequestDescriptor, Integer> countCache,
@Nullable LoadingCache<StatsRequestDescriptor, StatsResponse> statsCache) {
this.elasticConnection = elasticConnection;
this.indexDefinition = indexDefinition;
this.countCache = countCache;
this.countCache = Objects.requireNonNullElseGet(countCache, () ->
setupCountCache(getCacheMaxSize(), getCacheExpireSeconds(), getCacheRefreshSeconds(), null));
this.statsCache = Objects.requireNonNullElseGet(statsCache, () ->
setupCache(getCacheMaxSize(), getCacheExpireSeconds(), getCacheRefreshSeconds(), new StatsCacheLoader(), null));

}

/**
Expand Down Expand Up @@ -127,7 +131,7 @@ public int getDocCountFor(Query query) {
* {@code ElasticIndexDefinition}.
*/
public long primaryStoreSize() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).primaryStoreSize;
}
Expand All @@ -137,7 +141,7 @@ public long primaryStoreSize() {
* primary shards and replica shards.
*/
public long storeSize() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).storeSize;
}
Expand All @@ -146,7 +150,7 @@ public long storeSize() {
* Returns the creation date for the remote index bound to the {@code ElasticIndexDefinition}.
*/
public long creationDate() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).creationDate;
}
Expand All @@ -156,7 +160,7 @@ public long creationDate() {
* {@code ElasticIndexDefinition}. This document count includes hidden nested documents.
*/
public int luceneNumDocs() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).luceneDocsCount;
}
Expand All @@ -166,7 +170,7 @@ public int luceneNumDocs() {
* {@code ElasticIndexDefinition}. This document count includes hidden nested documents.
*/
public int luceneNumDeletedDocs() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).luceneDocsDeleted;
}
Expand All @@ -188,6 +192,18 @@ static <K, V> LoadingCache<K, V> setupCache(long maxSize, long expireSeconds, lo
return cacheBuilder.build(cacheLoader);
}

private Long getCacheMaxSize() {
return Long.getLong(MAX_SIZE, MAX_SIZE_DEFAULT);
}

private Long getCacheExpireSeconds() {
return Long.getLong(EXPIRE_SECONDS, EXPIRE_SECONDS_DEFAULT);
}

private Long getCacheRefreshSeconds() {
return Long.getLong(REFRESH_SECONDS, REFRESH_SECONDS_DEFAULT);
}

static class CountCacheLoader extends CacheLoader<StatsRequestDescriptor, Integer> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.clients.elasticsearch.core.CountResponse;
import org.apache.jackrabbit.guava.common.base.Ticker;
import org.apache.jackrabbit.guava.common.cache.LoadingCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand Down Expand Up @@ -53,13 +54,20 @@ public class ElasticIndexStatisticsTest {
@Mock
private ElasticsearchClient elasticClientMock;

private AutoCloseable closeable;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
this.closeable = MockitoAnnotations.openMocks(this);
when(indexDefinitionMock.getIndexAlias()).thenReturn("test-index");
when(elasticConnectionMock.getClient()).thenReturn(elasticClientMock);
}

@After
public void releaseMocks() throws Exception {
closeable.close();
}

@Test
public void defaultIndexStatistics() {
ElasticIndexStatistics indexStatistics =
Expand All @@ -73,7 +81,7 @@ public void cachedStatistics() throws Exception {
LoadingCache<ElasticIndexStatistics.StatsRequestDescriptor, Integer> cache =
ElasticIndexStatistics.setupCountCache(100, 10 * 60, 60, ticker);
ElasticIndexStatistics indexStatistics =
new ElasticIndexStatistics(elasticConnectionMock, indexDefinitionMock, cache);
new ElasticIndexStatistics(elasticConnectionMock, indexDefinitionMock, cache, null);

CountResponse countResponse = mock(CountResponse.class);
when(countResponse.count()).thenReturn(100L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import eu.rekawek.toxiproxy.model.toxic.LimitData;
import org.apache.jackrabbit.oak.api.Tree;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ProvideSystemProperty;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.containsString;
Expand All @@ -37,6 +40,15 @@

public class ElasticReliabilityTest extends ElasticAbstractQueryTest {

// set cache expiration and refresh to low values to avoid cached results in tests
@Rule
public final ProvideSystemProperty updateSystemProperties
= new ProvideSystemProperty("oak.elastic.statsExpireSeconds", "5")
.and("oak.elastic.statsRefreshSeconds", "1");

@Rule
public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();

private static final DockerImageName TOXIPROXY_IMAGE = DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.9.0");

private ToxiproxyContainer toxiproxy;
Expand Down Expand Up @@ -82,25 +94,34 @@ public void connectionCutOnQuery() throws Exception {
test.addChild("a").setProperty("propa", "a");
test.addChild("b").setProperty("propa", "c");
test.addChild("c").setProperty("propb", "e");
root.commit(Collections.singletonMap("sync-mode", "rt"));
root.commit(Map.of("sync-mode", "rt"));

String query = "select [jcr:path] from [nt:base] where propa is not null";

assertEventually(() -> {
assertThat(explain(query), containsString("elasticsearch:" + indexName));
assertQuery(query, List.of("/test/a", "/test/b"));
});

// simulate an upstream connection cut
LimitData cutConnectionUpstream = proxy.toxics()
.limitData("CUT_CONNECTION_UPSTREAM", ToxicDirection.UPSTREAM, 0L);

// elastic is down, query should not use it
assertThat(explain(query), not(containsString("elasticsearch:" + indexName)));
assertEventually(() -> {
// elastic is down, query should not use it
assertThat(explain(query), not(containsString("elasticsearch:" + indexName)));

// result set should be correct anyway since traversal is enabled
assertQuery(query, Arrays.asList("/test/a", "/test/b"));
// result set should be correct anyway since traversal is enabled
assertQuery(query, List.of("/test/a", "/test/b"));
});

// re-establish connection
cutConnectionUpstream.remove();

// result set should be the same as before but this time elastic should be used
assertThat(explain(query), containsString("elasticsearch:" + indexName));
assertQuery(query, Arrays.asList("/test/a", "/test/b"));
assertEventually(() -> {
// result set should be the same as before but this time elastic should be used
assertThat(explain(query), containsString("elasticsearch:" + indexName));
assertQuery(query, List.of("/test/a", "/test/b"));
});
}
}

0 comments on commit 58bde0a

Please sign in to comment.