diff --git a/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchClient.java b/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchClient.java index 171e4ab53ea..89518459093 100644 --- a/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchClient.java +++ b/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchClient.java @@ -259,6 +259,14 @@ public interface ElasticsearchClient { */ void refreshAllIndexes() throws ClientException; + /** + * Forces the Elasticsearch to refresh a specific index. + * + * @throws ClientException if error occurs while refreshing. + * @since 2.1.0 + */ + void refreshIndex(String index) throws ClientException; + /** * Deletes all indexes. * diff --git a/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchRepository.java b/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchRepository.java index 4b20e39a1d4..2580650644d 100644 --- a/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchRepository.java +++ b/service/commons/elasticsearch/client-api/src/main/java/org/eclipse/kapua/service/elasticsearch/client/ElasticsearchRepository.java @@ -256,6 +256,15 @@ public void refreshAllIndexes() { } } + public void refreshIndex(String indexExp) { + try { + this.indexUpserted.invalidateAll(); + elasticsearchClientProviderInstance.getElasticsearchClient().refreshIndex(indexExp); + } catch (ClientException e) { + throw new RuntimeException(e); + } + } + @Override public void deleteAllIndexes() { try { diff --git a/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/ElasticsearchResourcePaths.java b/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/ElasticsearchResourcePaths.java index acece9483a6..2d77bd5976a 100644 --- a/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/ElasticsearchResourcePaths.java +++ b/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/ElasticsearchResourcePaths.java @@ -99,6 +99,13 @@ public static String refreshAllIndexes() { return "/_all/_refresh"; } + /** + * @since 2.1.0 + */ + public static String refreshIndex(@NotNull String index) { + return String.format("/%s/_refresh", index); + } + /** * @since 1.0.0 */ diff --git a/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/RestElasticsearchClient.java b/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/RestElasticsearchClient.java index 785579640ec..5af8f218ec5 100644 --- a/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/RestElasticsearchClient.java +++ b/service/commons/elasticsearch/client-rest/src/main/java/org/eclipse/kapua/service/elasticsearch/client/rest/RestElasticsearchClient.java @@ -437,6 +437,16 @@ public void refreshAllIndexes() throws ClientException { } } + public void refreshIndex(String index) throws ClientException { + LOG.debug("Refresh index: {}", index); + Request request = new Request(ElasticsearchKeywords.ACTION_POST, ElasticsearchResourcePaths.refreshIndex(index)); + Response refreshIndexResponse = restCallTimeoutHandler(() -> getClient().performRequest(request), index, "REFRESH INDEX"); + + if (!isRequestSuccessful(refreshIndexResponse)) { + throw buildExceptionFromUnsuccessfulResponse("Refresh indexes", refreshIndexResponse); + } + } + @Override public void deleteAllIndexes() throws ClientException { LOG.debug("Delete all indexes"); diff --git a/service/commons/storable/api/src/main/java/org/eclipse/kapua/service/storable/repository/StorableRepository.java b/service/commons/storable/api/src/main/java/org/eclipse/kapua/service/storable/repository/StorableRepository.java index 6df3f8b0b93..21c3c69bcd0 100644 --- a/service/commons/storable/api/src/main/java/org/eclipse/kapua/service/storable/repository/StorableRepository.java +++ b/service/commons/storable/api/src/main/java/org/eclipse/kapua/service/storable/repository/StorableRepository.java @@ -41,6 +41,8 @@ public interface StorableRepository< void refreshAllIndexes(); + void refreshIndex(String indexExp); + void deleteAllIndexes(); void deleteIndexes(String indexExp); diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ChannelInfoElasticsearchRepository.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ChannelInfoElasticsearchRepository.java index 5c042a1d345..fc4d15db6df 100644 --- a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ChannelInfoElasticsearchRepository.java +++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ChannelInfoElasticsearchRepository.java @@ -63,4 +63,14 @@ protected String indexResolver(KapuaId scopeId) { protected JsonNode getIndexSchema() throws MappingException { return ChannelInfoSchema.getChannelTypeSchema(); } + + @Override + public void refreshAllIndexes() { + super.refreshIndex(datastoreUtils.getChannelIndexName(KapuaId.ANY)); + } + + @Override + public void deleteAllIndexes() { + super.deleteIndexes(datastoreUtils.getChannelIndexName(KapuaId.ANY)); + } } diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ClientInfoElasticsearchRepository.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ClientInfoElasticsearchRepository.java index 5309b56383e..44a4c3d4c47 100644 --- a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ClientInfoElasticsearchRepository.java +++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/ClientInfoElasticsearchRepository.java @@ -64,4 +64,13 @@ protected StorableId idExtractor(ClientInfo storable) { return storable.getId(); } + @Override + public void refreshAllIndexes() { + super.refreshIndex(datastoreUtils.getClientIndexName(KapuaId.ANY)); + } + + @Override + public void deleteAllIndexes() { + super.deleteIndexes(datastoreUtils.getClientIndexName(KapuaId.ANY)); + } } diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageElasticsearchRepository.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageElasticsearchRepository.java index 913fb9e4f6b..cefebc5323e 100644 --- a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageElasticsearchRepository.java +++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MessageElasticsearchRepository.java @@ -199,13 +199,13 @@ private ObjectNode getNewMessageMappingsBuilder(Map esMetrics) t @Override public void refreshAllIndexes() { - super.refreshAllIndexes(); + super.refreshIndex(datastoreUtils.getDataIndexName(KapuaId.ANY)); this.metricsByIndex.invalidateAll(); } @Override public void deleteAllIndexes() { - super.deleteAllIndexes(); + super.deleteIndexes(datastoreUtils.getDataIndexName(KapuaId.ANY)); this.metricsByIndex.invalidateAll(); } diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MetricInfoRepositoryImpl.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MetricInfoRepositoryImpl.java index 217a852b4c4..87bbdc9b2cc 100644 --- a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MetricInfoRepositoryImpl.java +++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/MetricInfoRepositoryImpl.java @@ -63,4 +63,14 @@ protected String indexResolver(KapuaId scopeId) { protected StorableId idExtractor(MetricInfo storable) { return storable.getId(); } + + @Override + public void refreshAllIndexes() { + super.refreshIndex(datastoreUtils.getMetricIndexName(KapuaId.ANY)); + } + + @Override + public void deleteAllIndexes() { + super.deleteIndexes(datastoreUtils.getMetricIndexName(KapuaId.ANY)); + } } diff --git a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/DatastoreUtils.java b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/DatastoreUtils.java index 7c011181d29..bca0007f3b9 100644 --- a/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/DatastoreUtils.java +++ b/service/datastore/internal/src/main/java/org/eclipse/kapua/service/datastore/internal/mediator/DatastoreUtils.java @@ -136,7 +136,12 @@ public String getDataIndexName(KapuaId scopeId) { if (StringUtils.isNotEmpty(prefix)) { sb.append(prefix).append("-"); } - String indexName = normalizedIndexName(scopeId.toStringId()); + String indexName; + if (KapuaId.ANY.equals(scopeId)) { + indexName = "*"; + } else { + indexName = normalizedIndexName(scopeId.toStringId()); + } sb.append(indexName).append("-").append("data-message").append("-*"); return sb.toString(); } @@ -198,7 +203,12 @@ private String getRegistryIndexName(KapuaId scopeId, IndexType indexType) { if (StringUtils.isNotEmpty(prefix)) { sb.append(prefix).append("-"); } - String indexName = normalizedIndexName(scopeId.toStringId()); + String indexName; + if (KapuaId.ANY.equals(scopeId)) { + indexName = "*"; + } else { + indexName = normalizedIndexName(scopeId.toStringId()); + } sb.append(indexName); sb.append("-data-").append(indexType.name().toLowerCase()); return sb.toString();