Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11072: (fix) flaky test: ElasticReliabilityTest.connectionCutOnQuery #1710

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@
* </ul>
*/
public class ElasticIndexStatistics implements IndexStatistics {
private static final Long MAX_SIZE = Long.getLong("oak.elastic.statsMaxSize", 10000);
private static final Long EXPIRE_SECONDS = Long.getLong("oak.elastic.statsExpireSeconds", 10 * 60);
private static final Long REFRESH_SECONDS = Long.getLong("oak.elastic.statsRefreshSeconds", 60);

private static final LoadingCache<StatsRequestDescriptor, Integer> DEFAULT_COUNT_CACHE =
setupCountCache(MAX_SIZE, EXPIRE_SECONDS, REFRESH_SECONDS, null);

private static final LoadingCache<StatsRequestDescriptor, StatsResponse> STATS_CACHE =
setupCache(MAX_SIZE, EXPIRE_SECONDS, REFRESH_SECONDS, new StatsCacheLoader(), null);
private static final String MAX_SIZE = "oak.elastic.statsMaxSize";
private static final Long MAX_SIZE_DEFAULT = 10000L;
private static final String EXPIRE_SECONDS = "oak.elastic.statsExpireSeconds";
private static final Long EXPIRE_SECONDS_DEFAULT = 10 * 60L;
private static final String REFRESH_SECONDS = "oak.elastic.statsRefreshSeconds";
private static final Long REFRESH_SECONDS_DEFAULT = 60L;

private static final ExecutorService REFRESH_EXECUTOR = new ThreadPoolExecutor(
0, 4, 60L, TimeUnit.SECONDS,
Expand All @@ -78,19 +76,25 @@ public class ElasticIndexStatistics implements IndexStatistics {
private final ElasticConnection elasticConnection;
private final ElasticIndexDefinition indexDefinition;
private final LoadingCache<StatsRequestDescriptor, Integer> countCache;
private final LoadingCache<StatsRequestDescriptor, StatsResponse> statsCache;

ElasticIndexStatistics(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition) {
this(elasticConnection, indexDefinition, DEFAULT_COUNT_CACHE);
this(elasticConnection, indexDefinition, null, null);
}

@TestOnly
ElasticIndexStatistics(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition,
@NotNull LoadingCache<StatsRequestDescriptor, Integer> countCache) {
@Nullable LoadingCache<StatsRequestDescriptor, Integer> countCache,
@Nullable LoadingCache<StatsRequestDescriptor, StatsResponse> statsCache) {
this.elasticConnection = elasticConnection;
this.indexDefinition = indexDefinition;
this.countCache = countCache;
this.countCache = Objects.requireNonNullElseGet(countCache, () ->
setupCountCache(getCacheMaxSize(), getCacheExpireSeconds(), getCacheRefreshSeconds(), null));
this.statsCache = Objects.requireNonNullElseGet(statsCache, () ->
setupCache(getCacheMaxSize(), getCacheExpireSeconds(), getCacheRefreshSeconds(), new StatsCacheLoader(), null));

}

/**
Expand Down Expand Up @@ -127,7 +131,7 @@ public int getDocCountFor(Query query) {
* {@code ElasticIndexDefinition}.
*/
public long primaryStoreSize() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).primaryStoreSize;
}
Expand All @@ -137,7 +141,7 @@ public long primaryStoreSize() {
* primary shards and replica shards.
*/
public long storeSize() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).storeSize;
}
Expand All @@ -146,7 +150,7 @@ public long storeSize() {
* Returns the creation date for the remote index bound to the {@code ElasticIndexDefinition}.
*/
public long creationDate() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).creationDate;
}
Expand All @@ -156,7 +160,7 @@ public long creationDate() {
* {@code ElasticIndexDefinition}. This document count includes hidden nested documents.
*/
public int luceneNumDocs() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).luceneDocsCount;
}
Expand All @@ -166,7 +170,7 @@ public int luceneNumDocs() {
* {@code ElasticIndexDefinition}. This document count includes hidden nested documents.
*/
public int luceneNumDeletedDocs() {
return STATS_CACHE.getUnchecked(
return statsCache.getUnchecked(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).luceneDocsDeleted;
}
Expand All @@ -188,6 +192,18 @@ static <K, V> LoadingCache<K, V> setupCache(long maxSize, long expireSeconds, lo
return cacheBuilder.build(cacheLoader);
}

private Long getCacheMaxSize() {
return Long.getLong(MAX_SIZE, MAX_SIZE_DEFAULT);
}

private Long getCacheExpireSeconds() {
return Long.getLong(EXPIRE_SECONDS, EXPIRE_SECONDS_DEFAULT);
}

private Long getCacheRefreshSeconds() {
return Long.getLong(REFRESH_SECONDS, REFRESH_SECONDS_DEFAULT);
}

static class CountCacheLoader extends CacheLoader<StatsRequestDescriptor, Integer> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.clients.elasticsearch.core.CountResponse;
import org.apache.jackrabbit.guava.common.base.Ticker;
import org.apache.jackrabbit.guava.common.cache.LoadingCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand Down Expand Up @@ -53,13 +54,20 @@ public class ElasticIndexStatisticsTest {
@Mock
private ElasticsearchClient elasticClientMock;

private AutoCloseable closeable;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
this.closeable = MockitoAnnotations.openMocks(this);
when(indexDefinitionMock.getIndexAlias()).thenReturn("test-index");
when(elasticConnectionMock.getClient()).thenReturn(elasticClientMock);
}

@After
public void releaseMocks() throws Exception {
closeable.close();
}

@Test
public void defaultIndexStatistics() {
ElasticIndexStatistics indexStatistics =
Expand All @@ -73,7 +81,7 @@ public void cachedStatistics() throws Exception {
LoadingCache<ElasticIndexStatistics.StatsRequestDescriptor, Integer> cache =
ElasticIndexStatistics.setupCountCache(100, 10 * 60, 60, ticker);
ElasticIndexStatistics indexStatistics =
new ElasticIndexStatistics(elasticConnectionMock, indexDefinitionMock, cache);
new ElasticIndexStatistics(elasticConnectionMock, indexDefinitionMock, cache, null);

CountResponse countResponse = mock(CountResponse.class);
when(countResponse.count()).thenReturn(100L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import eu.rekawek.toxiproxy.model.toxic.LimitData;
import org.apache.jackrabbit.oak.api.Tree;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ProvideSystemProperty;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.containsString;
Expand All @@ -37,6 +40,15 @@

public class ElasticReliabilityTest extends ElasticAbstractQueryTest {

// set cache expiration and refresh to low values to avoid cached results in tests
@Rule
public final ProvideSystemProperty updateSystemProperties
= new ProvideSystemProperty("oak.elastic.statsExpireSeconds", "5")
.and("oak.elastic.statsRefreshSeconds", "1");

@Rule
public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();

private static final DockerImageName TOXIPROXY_IMAGE = DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.9.0");

private ToxiproxyContainer toxiproxy;
Expand Down Expand Up @@ -82,25 +94,34 @@ public void connectionCutOnQuery() throws Exception {
test.addChild("a").setProperty("propa", "a");
test.addChild("b").setProperty("propa", "c");
test.addChild("c").setProperty("propb", "e");
root.commit(Collections.singletonMap("sync-mode", "rt"));
root.commit(Map.of("sync-mode", "rt"));

String query = "select [jcr:path] from [nt:base] where propa is not null";

assertEventually(() -> {
assertThat(explain(query), containsString("elasticsearch:" + indexName));
assertQuery(query, List.of("/test/a", "/test/b"));
});

// simulate an upstream connection cut
LimitData cutConnectionUpstream = proxy.toxics()
.limitData("CUT_CONNECTION_UPSTREAM", ToxicDirection.UPSTREAM, 0L);

// elastic is down, query should not use it
assertThat(explain(query), not(containsString("elasticsearch:" + indexName)));
assertEventually(() -> {
// elastic is down, query should not use it
assertThat(explain(query), not(containsString("elasticsearch:" + indexName)));

// result set should be correct anyway since traversal is enabled
assertQuery(query, Arrays.asList("/test/a", "/test/b"));
// result set should be correct anyway since traversal is enabled
assertQuery(query, List.of("/test/a", "/test/b"));
});

// re-establish connection
cutConnectionUpstream.remove();

// result set should be the same as before but this time elastic should be used
assertThat(explain(query), containsString("elasticsearch:" + indexName));
assertQuery(query, Arrays.asList("/test/a", "/test/b"));
assertEventually(() -> {
// result set should be the same as before but this time elastic should be used
assertThat(explain(query), containsString("elasticsearch:" + indexName));
assertQuery(query, List.of("/test/a", "/test/b"));
});
}
}
Loading