Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/external multi collection #106

Merged
merged 2 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>com.github.github</groupId>
<artifactId>site-maven-plugin</artifactId>
Expand Down
49 changes: 23 additions & 26 deletions rre-core/src/main/java/io/sease/rre/core/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -445,25 +445,28 @@ private Stream<JsonNode> all(final JsonNode source, final String name) {
/**
* Prepares the search platform with the given index name and dataset.
*
* @param indexName the index name.
* @param data the dataset.
* @param collection the index name.
* @param dataToBeIndexed the dataset.
*/
private void prepareData(final String indexName, final File data) {
if (data != null) {
LOGGER.info("Preparing data for " + indexName + " from " + data.getAbsolutePath());
private void prepareData(final String collection, final File dataToBeIndexed) {
if (dataToBeIndexed != null) {
LOGGER.info("Preparing dataToBeIndexed for " + collection + " from " + dataToBeIndexed.getAbsolutePath());
} else {
LOGGER.info("Preparing platform for " + indexName);
LOGGER.info("Preparing platform for " + collection);
}

boolean corporaChanged = folderHasChanged(corporaFolder);

versionManager.getConfigurationVersionFolders().stream()
.filter(versionFolder -> (folderHasChanged(versionFolder) || corporaChanged || platform.isRefreshRequired()))
.flatMap(versionFolder -> stream(safe(versionFolder.listFiles(ONLY_NON_HIDDEN_FILES))))
.filter(file -> platform.isSearchPlatformFile(indexName, file))
Stream<File> searchCollectionsConfigs = versionManager.getConfigurationVersionFolders().stream()
.filter(versionFolder -> isConfigurationReloadNecessary(versionFolder))
.flatMap(versionFolder -> stream(safe(versionFolder.listFiles(ONLY_NON_HIDDEN_FILES))));
//each one of searchCollectionsConfigs stream elements is a full configuration for a search collection
searchCollectionsConfigs
.filter(file -> platform.isSearchPlatformConfiguration(collection, file))
.sorted()
.peek(file -> LOGGER.info("RRE: Loading the Test Collection into " + platform.getName() + ", configuration version " + file.getParentFile().getName()))
.forEach(fileOrFolder -> platform.load(data, fileOrFolder, indexFqdn(indexName, fileOrFolder.getParentFile().getName())));
.peek(searchPlatformConfiguration -> LOGGER.info("RRE: Loading the Search Engine " + platform.getName() + ", configuration version " + searchPlatformConfiguration.getParentFile().getName()))
.forEach(searchPlatformConfiguration -> {
String version = searchPlatformConfiguration.getParentFile().getName();
platform.load(dataToBeIndexed, searchPlatformConfiguration, collection, version);
});

LOGGER.info("RRE: " + platform.getName() + " has been correctly loaded.");

Expand All @@ -472,6 +475,11 @@ private void prepareData(final String indexName, final File data) {
LOGGER.info("RRE: target versions are " + String.join(",", versionManager.getConfigurationVersions()));
}

private boolean isConfigurationReloadNecessary( File versionFolder) {
boolean corporaChanged = folderHasChanged(corporaFolder);
return folderHasChanged(versionFolder) || corporaChanged || platform.isRefreshRequired();
}

private boolean folderHasChanged(File folder) {
boolean ret = true;

Expand All @@ -496,16 +504,5 @@ private void flushFileChecksums() {
}
}

/**
* Returns the FQDN of the target index that will be used.
* Starting from the index name declared in the configuration, RRE uses an internal naming (which adds the version
* name) for avoiding conflicts between versions.
*
* @param indexName the index name.
* @param version the current version.
* @return the FDQN of the target index that will be used.
*/
public static String indexFqdn(final String indexName, final String version) {
return (indexName + "_" + version).toLowerCase();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Collection<String> getVersions() {

QueryOrSearchResponse executeQuery(String indexName, String version, JsonNode queryNode, String defaultTemplate, int relevantDocCount) {
return platform.executeQuery(
Engine.indexFqdn(indexName, version),
indexName, version,
query(queryNode, defaultTemplate, version),
fields,
Math.max(10, relevantDocCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setup() throws Exception {
// Set up template manager
when(templateManager.getTemplate(isNull(), eq(TEMPLATE), isA(String.class))).thenReturn(QUERY_TEMPLATE);
// Set up platform for each version query
versions.forEach(v -> when(platform.executeQuery(eq(INDEX_NAME + "_" + v), eq(QUERY_VALUE), any(String[].class), anyInt()))
versions.forEach(v -> when(platform.executeQuery(eq(INDEX_NAME), eq(v), eq(QUERY_VALUE), any(String[].class), anyInt()))
.thenReturn(new QueryOrSearchResponse(0, Collections.emptyList())));
}

Expand Down Expand Up @@ -124,7 +124,7 @@ private void verifyPersistence() {
}

private void verifySearchPlatform() {
versions.forEach(v -> verify(platform).executeQuery(eq(INDEX_NAME + "_" + v), eq(QUERY_VALUE), eq(fields), anyInt()));
versions.forEach(v -> verify(platform).executeQuery(eq(INDEX_NAME), eq(v), eq(QUERY_VALUE), eq(fields), anyInt()));
}

private static Query buildQuery() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,26 @@ public interface SearchPlatform extends Closeable {

/**
* Loads some data in a given index.
*
* @param corpus the data.
* @param dataToBeIndexed the data.
* @param configFolder the folder that contains the configuration for the given index.
* @param targetIndexName the name of the index where data will be indexed.
* @param collection the name of the index where data will be indexed.
* @param version the id of the configuration version
*/
void load(final File corpus, final File configFolder, final String targetIndexName);
void load(final File dataToBeIndexed, final File configFolder, final String collection, final String version);

/**
* Returns the FQDN of the target index that will be used.
* Starting from the index name declared in the configuration, RRE uses an internal naming (which adds the version
* name) for avoiding conflicts between versions.
*
* @param indexName the index name.
* @param version the current version.
* @return the FDQN of the target index that will be used.
*/
default String getFullyQualifiedDomainName(final String indexName, final String version) {
return (indexName + "_" + version).toLowerCase();
}

/**
* Starts this search platform.
*/
Expand All @@ -64,13 +77,14 @@ public interface SearchPlatform extends Closeable {
* Executes the given query.
* The semantic of the input query may change between concrete platforms
*
* @param indexName the index name that holds the data.
* @param collection the index name that holds the data.
* @param version the id of the configuration version
* @param query the query.
* @param fields the fields to return.
* @param maxRows the maximum number of rows that will be returned.
* @return the response of the query execution.
*/
QueryOrSearchResponse executeQuery(String indexName, String query, final String[] fields, int maxRows);
QueryOrSearchResponse executeQuery(String collection, String version, String query, final String[] fields, int maxRows);

/**
* Returns the name of this search platform.
Expand Down Expand Up @@ -98,7 +112,7 @@ public interface SearchPlatform extends Closeable {
* @return {@code true} if the file is a search platform config file or
* directory.
*/
boolean isSearchPlatformFile(String indexName, File file);
boolean isSearchPlatformConfiguration(String indexName, File file);

/**
* @return {@code true} if this platform requires a corpora file to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rre-search-platform-elasticsearch-impl</artifactId>
<version>7.4.2</version>
<version>7.5.0</version>
<name>RRE - Elasticsearch platform binding</name>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ public void beforeStart(final Map<String, Object> configuration) {
}

@Override
public void load(final File data, File indexShapeFile, String indexName) {
public void load(final File dataToBeIndexed, File indexShapeFile, String collection, String version) {
if (!indexShapeFile.getName().startsWith("index")) {
throw new IllegalArgumentException("Unable to find an index-shape (i.e. settings + mappings) within the configuration folder.");
}

String indexName = getFullyQualifiedDomainName(collection, version);
try {
final ObjectMapper mapper = new ObjectMapper();
final JsonNode esconfig = mapper.readTree(indexShapeFile);
Expand Down Expand Up @@ -173,7 +173,7 @@ public void load(final File data, File indexShapeFile, String indexName) {
proxy.admin().indices().create(request).actionGet();

final BulkRequest bulkRequest = new BulkRequest();
final List<String> lines = Files.readAllLines(data.toPath());
final List<String> lines = Files.readAllLines(dataToBeIndexed.toPath());

for (int i = 0; i < lines.size(); i += 2) {
JsonNode metadata = mapper.readTree(lines.get(i)).get("index");
Expand All @@ -189,7 +189,7 @@ public void load(final File data, File indexShapeFile, String indexName) {
if (response.hasFailures()) {
final String message =
"Unable to load datafile (" +
data.getAbsolutePath() +
dataToBeIndexed.getAbsolutePath() +
") in " +
getName() +
" using the index shape (" +
Expand Down Expand Up @@ -241,7 +241,8 @@ public void close() {
}

@Override
public QueryOrSearchResponse executeQuery(final String indexName, final String query, final String[] fields, final int maxRows) {
public QueryOrSearchResponse executeQuery(final String collection, String version, final String query, final String[] fields, final int maxRows) {
String indexName = getFullyQualifiedDomainName(collection, version);
try {
final SearchResponse qresponse = proxy.search(buildSearchRequest(indexName, query, fields, maxRows)).actionGet();
return convertResponse(qresponse);
Expand Down Expand Up @@ -324,7 +325,7 @@ private void insertNamespaces(final List<JsonNode> parents, final String pathAtt
}

@Override
public boolean isSearchPlatformFile(String indexName, File file) {
public boolean isSearchPlatformConfiguration(String indexName, File file) {
return file.isFile() && file.getName().equals("index-shape.json");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class ExternalElasticsearch extends Elasticsearch {
private static final String NAME = "External Elasticsearch";
static final String SETTINGS_FILE = "index-settings.json";

private final Map<String, IndexSettings> indexSettingsMap = new HashMap<>();
private final Map<String, RestHighLevelClient> indexClients = new HashMap<>();

@Override
Expand All @@ -63,16 +62,18 @@ public void start() {
}

@Override
public void load(File corpus, File settingsFile, String targetIndexName) {
public void load(File dataToBeIndexed, File settingsFile, String collection, String version) {
// Corpus file is not used for this implementation
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

try {
// Load the index settings for this version of the search platform
IndexSettings settings = mapper.readValue(settingsFile, IndexSettings.class);
indexSettingsMap.put(targetIndexName, settings);
indexClients.put(targetIndexName, initialiseClient(settings.getHostUrls()));
if (indexClients.get(version) == null) {
indexClients.put(version, initialiseClient(settings.getHostUrls()));
}

} catch (IOException e) {
LOGGER.error("Could not read settings from " + settingsFile.getName() + " :: " + e.getMessage());
}
Expand All @@ -88,15 +89,11 @@ private RestHighLevelClient initialiseClient(List<String> hosts) {
}

@Override
public QueryOrSearchResponse executeQuery(final String indexName, final String query, final String[] fields, final int maxRows) {
// Find the actual index to search
if (!indexSettingsMap.containsKey(indexName)) {
throw new IllegalArgumentException("Cannot find settings for index " + indexName);
}
public QueryOrSearchResponse executeQuery(final String collection, String version, final String query, final String[] fields, final int maxRows) {

try {
final SearchRequest request = buildSearchRequest(indexSettingsMap.get(indexName).getIndex(), query, fields, maxRows);
final SearchResponse response = runQuery(indexName, request);
final SearchRequest request = buildSearchRequest(collection, query, fields, maxRows);
final SearchResponse response = runQuery(version, request);
return convertResponse(response);
} catch (final ElasticsearchException e) {
LOGGER.error("Caught ElasticsearchException :: " + e.getMessage());
Expand All @@ -106,10 +103,10 @@ public QueryOrSearchResponse executeQuery(final String indexName, final String q
}
}

private SearchResponse runQuery(final String indexKey, final SearchRequest request) throws IOException {
RestHighLevelClient client = indexClients.get(indexKey);
private SearchResponse runQuery(final String clientId, final SearchRequest request) throws IOException {
RestHighLevelClient client = indexClients.get(clientId);
if (client == null) {
throw new RuntimeException("No HTTP client found for index " + indexKey);
throw new RuntimeException("No HTTP client found for index " + clientId);
}
return client.search(request, RequestOptions.DEFAULT);
}
Expand All @@ -120,8 +117,8 @@ public String getName() {
}

@Override
public boolean isSearchPlatformFile(String indexName, File file) {
return file.isFile() && file.getName().equals(SETTINGS_FILE);
public boolean isSearchPlatformConfiguration(String indexName, File searchEngineStartupSettings) {
return searchEngineStartupSettings.isFile() && searchEngineStartupSettings.getName().equals(SETTINGS_FILE);
}

@Override
Expand All @@ -144,20 +141,12 @@ private void closeClient(RestHighLevelClient client) {


public static class IndexSettings {

@JsonProperty("index")
private final String index;
@JsonProperty("hostUrls")
private final List<String> hostUrls;

public IndexSettings(@JsonProperty("index") String index,
@JsonProperty("hostUrls") List<String> hostUrls) {
this.index = index;
this.hostUrls = hostUrls;
}
public IndexSettings(@JsonProperty("hostUrls") List<String> hostUrls) {

String getIndex() {
return index;
this.hostUrls = hostUrls;
}

List<String> getHostUrls() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ public void setupPlatform() {
@Test
public void isSearchPlatformFile_returnsFalseWhenDirectory() throws Exception {
File dummyFile = tempFolder.newFolder();
assertFalse(platform.isSearchPlatformFile(INDEX_NAME, dummyFile));
assertFalse(platform.isSearchPlatformConfiguration(INDEX_NAME, dummyFile));
}

@Test
public void isSearchPlatformFile_returnsFalseWhenFileIsNotESConfig() throws Exception {
File dummyFile = tempFolder.newFile();
assertFalse(platform.isSearchPlatformFile(INDEX_NAME, dummyFile));
assertFalse(platform.isSearchPlatformConfiguration(INDEX_NAME, dummyFile));
}

@Test
public void isSearchPlatformFile_returnsTrueWhenDirectoryContainsConfig() throws Exception {
File configFile = tempFolder.newFile("index-shape.json");
assertTrue(platform.isSearchPlatformFile(INDEX_NAME, configFile));
assertTrue(platform.isSearchPlatformConfiguration(INDEX_NAME, configFile));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ public void setupPlatform() {
@Test
public void isSearchPlatformFile_returnsFalseWhenDirectory() throws Exception {
File dummyFile = tempFolder.newFolder();
assertFalse(platform.isSearchPlatformFile(INDEX_NAME, dummyFile));
assertFalse(platform.isSearchPlatformConfiguration(INDEX_NAME, dummyFile));
}

@Test
public void isSearchPlatformFile_returnsFalseWhenFileIsNotESConfig() throws Exception {
File dummyFile = tempFolder.newFile();
assertFalse(platform.isSearchPlatformFile(INDEX_NAME, dummyFile));
assertFalse(platform.isSearchPlatformConfiguration(INDEX_NAME, dummyFile));
}

@Test
public void isSearchPlatformFile_returnsTrueWhenDirectoryContainsConfig() throws Exception {
File configFile = tempFolder.newFile(ExternalElasticsearch.SETTINGS_FILE);
assertTrue(platform.isSearchPlatformFile(INDEX_NAME, configFile));
assertTrue(platform.isSearchPlatformConfiguration(INDEX_NAME, configFile));
}
}
Loading