From 828dd5925e93365e86f09ffc8894099c36af87d5 Mon Sep 17 00:00:00 2001 From: Chris Sampson Date: Fri, 24 Jan 2025 18:41:05 +0000 Subject: [PATCH] Introduce ElasticsearchRequestOptions to allow for per-request parameters and headers in Elasticsearch operations --- .../ElasticSearchClientService.java | 80 ++++----- .../ElasticsearchRequestOptions.java | 43 +++++ .../ElasticSearchClientServiceImpl.java | 104 ++++++----- .../ElasticSearchLookupService.java | 4 +- .../ElasticSearchStringLookupService.java | 2 +- .../TestElasticSearchClientService.java | 34 ++-- .../integration/AbstractElasticsearch_IT.java | 2 +- .../ElasticSearchClientService_IT.java | 163 +++++++++--------- .../AbstractByQueryElasticsearch.java | 5 +- ...stractPaginatedJsonQueryElasticsearch.java | 14 +- .../DeleteByQueryElasticsearch.java | 7 +- .../ElasticsearchRestProcessor.java | 6 +- .../elasticsearch/GetElasticsearch.java | 6 +- .../elasticsearch/JsonQueryElasticsearch.java | 4 +- .../elasticsearch/PutElasticsearchJson.java | 5 +- .../elasticsearch/PutElasticsearchRecord.java | 18 +- .../UpdateByQueryElasticsearch.java | 8 +- .../AbstractByQueryElasticsearchTest.java | 28 +-- .../AbstractJsonQueryElasticsearchTest.java | 14 +- ...ctPaginatedJsonQueryElasticsearchTest.java | 10 +- .../elasticsearch/GetElasticsearchTest.java | 10 +- .../TestElasticsearchClientService.java | 72 ++++---- .../integration/AbstractElasticsearch_IT.java | 2 +- .../mock/AbstractMockElasticsearchClient.java | 33 ++-- .../mock/MockBulkLoadClientService.java | 7 +- 25 files changed, 369 insertions(+), 312 deletions(-) create mode 100644 nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchRequestOptions.java diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java index 63e433ab696a..5e2d2a37a682 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java @@ -296,21 +296,19 @@ Note that the Host is included in requests as a header (typically including doma * Index a document. * * @param operation A document to index. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return IndexOperationResponse if successful */ - IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters, Map requestHeaders); + IndexOperationResponse add(IndexOperationRequest operation, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Bulk process multiple documents. * * @param operations A list of index operations. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return IndexOperationResponse if successful. */ - IndexOperationResponse bulk(List operations, Map requestParameters, Map requestHeaders); + IndexOperationResponse bulk(List operations, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Count the documents that match the criteria. @@ -318,11 +316,10 @@ Note that the Host is included in requests as a header (typically including doma * @param query A query in the JSON DSL syntax * @param index The index to target. * @param type The type to target. Will not be used in future versions of Elasticsearch. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return number of documents matching the query */ - Long count(String query, String index, String type, Map requestParameters, Map requestHeaders); + Long count(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Delete a document by its ID from an index. @@ -330,11 +327,10 @@ Note that the Host is included in requests as a header (typically including doma * @param index The index to target. * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID to remove from the selected index. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters, Map requestHeaders); + DeleteOperationResponse deleteById(String index, String type, String id, ElasticsearchRequestOptions elasticsearchRequestOptions); /** @@ -342,11 +338,10 @@ Note that the Host is included in requests as a header (typically including doma * @param index The index to target. * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. * @param ids A list of document IDs to remove from the selected index. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters, Map requestHeaders); + DeleteOperationResponse deleteById(String index, String type, List ids, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Delete documents by query. @@ -354,11 +349,10 @@ Note that the Host is included in requests as a header (typically including doma * @param query A valid JSON query to be used for finding documents to delete. * @param index The index to target. * @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders); + DeleteOperationResponse deleteByQuery(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Update documents by query. @@ -366,30 +360,27 @@ Note that the Host is included in requests as a header (typically including doma * @param query A valid JSON query to be used for finding documents to update. * @param index The index to target. * @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return An UpdateOperationResponse object if successful. */ - UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders); + UpdateOperationResponse updateByQuery(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Refresh index/indices. * * @param index The index to target, if omitted then all indices will be updated. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. */ - void refresh(final String index, final Map requestParameters, Map requestHeaders); + void refresh(String index, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Check whether an index exists. * * @param index The index to check. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return true if index exists, false otherwise */ - boolean exists(final String index, final Map requestParameters, Map requestHeaders); + boolean exists(String index, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Check whether a document exists. @@ -397,11 +388,10 @@ Note that the Host is included in requests as a header (typically including doma * @param index The index that holds the document. * @param type The document type. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return true if doc exists in index, false otherwise */ - boolean documentExists(final String index, final String type, final String id, final Map requestParameters, Map requestHeaders); + boolean documentExists(String index, String type, String id, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Get a document by ID. @@ -409,11 +399,10 @@ Note that the Host is included in requests as a header (typically including doma * @param index The index that holds the document. * @param type The document type. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return Map if successful, null if not found. */ - Map get(String index, String type, String id, Map requestParameters, Map requestHeaders); + Map get(String index, String type, String id, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Perform a search using the JSON DSL. @@ -421,20 +410,20 @@ Note that the Host is included in requests as a header (typically including doma * @param query A JSON string representing the query. * @param index The index to target. Optional. * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. - * @param requestParameters A collection of URL request parameters. Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. * @return A SearchResponse object if successful. */ - SearchResponse search(String query, String index, String type, Map requestParameters, Map requestHeaders); + SearchResponse search(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Retrieve next page of results from a Scroll. * * @param scroll A JSON string containing scrollId and optional scroll (keep alive) retention period. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. + * Request Parameters will be ignored from the Request Options as unusable for this API. * @return A SearchResponse object if successful. */ - SearchResponse scroll(String scroll, Map requestHeaders); + SearchResponse scroll(String scroll, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Initialise a Point in Time for paginated queries. @@ -442,29 +431,32 @@ Note that the Host is included in requests as a header (typically including doma * * @param index Index targeted. * @param keepAlive Point in Time's retention period (maximum time Elasticsearch will retain the PiT between requests). Optional. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. + * Request Parameters will be ignored from the Request Options as unusable for this API. * @return the Point in Time Id (pit_id) */ - String initialisePointInTime(String index, String keepAlive, Map requestHeaders); + String initialisePointInTime(String index, String keepAlive, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Delete a Point in Time. * Requires Elasticsearch 7.10+ and XPack features. * * @param pitId Point in Time Id to be deleted. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. + * Request Parameters will be ignored from the Request Options as unusable for this API. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deletePointInTime(String pitId, Map requestHeaders); + DeleteOperationResponse deletePointInTime(String pitId, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Delete a Scroll. * * @param scrollId Scroll Id to be deleted. - * @param requestHeaders A collection of request headers. Optional. + * @param elasticsearchRequestOptions A collection of request options (parameters and header). Optional. + * Request Parameters will be ignored from the Request Options as unusable for this API. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteScroll(String scrollId, Map requestHeaders); + DeleteOperationResponse deleteScroll(String scrollId, ElasticsearchRequestOptions elasticsearchRequestOptions); /** * Build a transit URL to use with the provenance reporter. diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchRequestOptions.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchRequestOptions.java new file mode 100644 index 000000000000..39c15d1cf139 --- /dev/null +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticsearchRequestOptions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import java.util.HashMap; +import java.util.Map; + +public class ElasticsearchRequestOptions { + private final Map requestParameters; + private final Map requestHeaders; + + public ElasticsearchRequestOptions(final Map requestParameters, final Map requestHeaders) { + this.requestParameters = requestParameters == null ? new HashMap<>() : requestParameters; + this.requestHeaders = requestHeaders == null ? new HashMap<>() : requestHeaders; + } + + public ElasticsearchRequestOptions() { + this(new HashMap<>(), new HashMap<>()); + } + + public Map getRequestParameters() { + return requestParameters; + } + + public Map getRequestHeaders() { + return requestHeaders; + } +} diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java index f72eb5fb436d..365a2987e992 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java @@ -340,7 +340,9 @@ private void verifySniffer(final ConfigurationContext context, final RestClient private List getElasticsearchRoot(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider) throws IOException { final Request request = new Request("GET", "/"); - addJWTAuthorizationHeader(request.getOptions().toBuilder(), tokenProvider); + final RequestOptions.Builder requestOptionsBuilder = request.getOptions().toBuilder(); + addJWTAuthorizationHeader(requestOptionsBuilder, tokenProvider); + request.setOptions(requestOptionsBuilder.build()); final Response response = verifyClient.performRequest(request); final List warnings = parseResponseWarningHeaders(response); @@ -573,7 +575,7 @@ private void appendIndex(final StringBuilder sb, final String index) { } } - private Response runQuery(final String endpoint, final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + private Response runQuery(final String endpoint, final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { final StringBuilder sb = new StringBuilder(); appendIndex(sb, index); if (StringUtils.isNotBlank(type)) { @@ -583,7 +585,7 @@ private Response runQuery(final String endpoint, final String query, final Strin try { final HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); - return performRequest("POST", sb.toString(), requestParameters, requestHeaders, queryEntity); + return performRequest("POST", sb.toString(), elasticsearchRequestOptions, queryEntity); } catch (final Exception e) { throw new ElasticsearchException(e); } @@ -621,8 +623,8 @@ private List parseResponseWarningHeaders(final Response response) { } @Override - public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters, final Map requestHeaders) { - return bulk(Collections.singletonList(operation), requestParameters, requestHeaders); + public IndexOperationResponse add(final IndexOperationRequest operation, final ElasticsearchRequestOptions elasticsearchRequestOptions) { + return bulk(Collections.singletonList(operation), elasticsearchRequestOptions); } private String flatten(final String str) { @@ -692,7 +694,7 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui } @Override - public IndexOperationResponse bulk(final List operations, final Map requestParameters, final Map requestHeaders) { + public IndexOperationResponse bulk(final List operations, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final StringBuilder payload = new StringBuilder(); for (final IndexOperationRequest or : operations) { @@ -705,7 +707,7 @@ public IndexOperationResponse bulk(final List operations, final HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(); watch.start(); - final Response response = performRequest("POST", "/_bulk", requestParameters, requestHeaders, entity); + final Response response = performRequest("POST", "/_bulk", elasticsearchRequestOptions, entity); watch.stop(); final String rawResponse = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); @@ -722,20 +724,20 @@ public IndexOperationResponse bulk(final List operations, } @Override - public Long count(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { - final Response response = runQuery("_count", query, index, type, requestParameters, requestHeaders); + public Long count(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { + final Response response = runQuery("_count", query, index, type, elasticsearchRequestOptions); final Map parsed = parseResponse(response); return ((Integer) parsed.get("count")).longValue(); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { - return deleteById(index, type, Collections.singletonList(id), requestParameters, requestHeaders); + public DeleteOperationResponse deleteById(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { + return deleteById(index, type, Collections.singletonList(id), elasticsearchRequestOptions); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters, final Map requestHeaders) { + public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final StringBuilder sb = new StringBuilder(); for (final String id : ids) { @@ -745,7 +747,7 @@ public DeleteOperationResponse deleteById(final String index, final String type, final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(); watch.start(); - final Response response = performRequest("POST", "/_bulk", requestParameters, requestHeaders, entity); + final Response response = performRequest("POST", "/_bulk", elasticsearchRequestOptions, entity); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -760,10 +762,10 @@ public DeleteOperationResponse deleteById(final String index, final String type, } @Override - public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { final StopWatch watch = new StopWatch(); watch.start(); - final Response response = runQuery("_delete_by_query", query, index, type, requestParameters, requestHeaders); + final Response response = runQuery("_delete_by_query", query, index, type, elasticsearchRequestOptions); watch.stop(); // check for errors in response @@ -774,9 +776,9 @@ public DeleteOperationResponse deleteByQuery(final String query, final String in } @Override - public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { final long start = System.currentTimeMillis(); - final Response response = runQuery("_update_by_query", query, index, type, requestParameters, requestHeaders); + final Response response = runQuery("_update_by_query", query, index, type, elasticsearchRequestOptions); final long end = System.currentTimeMillis(); // check for errors in response @@ -786,12 +788,12 @@ public UpdateOperationResponse updateByQuery(final String query, final String in } @Override - public void refresh(final String index, final Map requestParameters, final Map requestHeaders) { + public void refresh(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); endpoint.append("/_refresh"); - final Response response = performRequest("POST", endpoint.toString(), requestParameters, requestHeaders, null); + final Response response = performRequest("POST", endpoint.toString(), elasticsearchRequestOptions, null); parseResponseWarningHeaders(response); } catch (final Exception ex) { throw new ElasticsearchException(ex); @@ -799,11 +801,11 @@ public void refresh(final String index, final Map requestParamet } @Override - public boolean exists(final String index, final Map requestParameters, final Map requestHeaders) { + public boolean exists(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); - final Response response = performRequest("HEAD", endpoint.toString(), requestParameters, requestHeaders, null); + final Response response = performRequest("HEAD", endpoint.toString(), elasticsearchRequestOptions, null); parseResponseWarningHeaders(response); if (response.getStatusLine().getStatusCode() == 200) { @@ -821,12 +823,12 @@ public boolean exists(final String index, final Map requestParam } @Override - public boolean documentExists(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + public boolean documentExists(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { boolean exists = true; try { - final Map existsParameters = requestParameters != null ? new HashMap<>(requestParameters) : new HashMap<>(); - existsParameters.putIfAbsent("_source", "false"); - get(index, type, id, existsParameters, requestHeaders); + final ElasticsearchRequestOptions existsRequestOptions = elasticsearchRequestOptions == null ? new ElasticsearchRequestOptions() : elasticsearchRequestOptions; + existsRequestOptions.getRequestParameters().putIfAbsent("_source", "false"); + get(index, type, id, existsRequestOptions); } catch (final ElasticsearchException ee) { if (ee.isNotFound()) { exists = false; @@ -839,7 +841,7 @@ public boolean documentExists(final String index, final String type, final Strin @SuppressWarnings("unchecked") @Override - public Map get(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + public Map get(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); @@ -850,7 +852,7 @@ public Map get(final String index, final String type, final Stri } endpoint.append("/").append(id); - final Response response = performRequest("GET", endpoint.toString(), requestParameters, requestHeaders, null); + final Response response = performRequest("GET", endpoint.toString(), elasticsearchRequestOptions, null); final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); parseResponseWarningHeaders(response); @@ -875,9 +877,9 @@ private int handleSearchCount(final Object raw) { } @Override - public SearchResponse search(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public SearchResponse search(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - final Response response = runQuery("_search", query, index, type, requestParameters, requestHeaders); + final Response response = runQuery("_search", query, index, type, elasticsearchRequestOptions); return buildSearchResponse(response); } catch (final ElasticsearchException ee) { throw ee; @@ -887,10 +889,12 @@ public SearchResponse search(final String query, final String index, final Strin } @Override - public SearchResponse scroll(final String scroll, final Map requestHeaders) { + public SearchResponse scroll(final String scroll, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON); - final Response response = performRequest("POST", "/_search/scroll", Collections.emptyMap(), requestHeaders, scrollEntity); + final Response response = performRequest("POST", "/_search/scroll", + // ensure no request params, which would break the API call + Collections.emptyMap(), elasticsearchRequestOptions == null ? Collections.emptyMap() : elasticsearchRequestOptions.getRequestHeaders(), scrollEntity); return buildSearchResponse(response); } catch (final Exception ex) { throw new ElasticsearchException(ex); @@ -898,17 +902,19 @@ public SearchResponse scroll(final String scroll, final Map requ } @Override - public String initialisePointInTime(final String index, final String keepAlive, final Map requestHeaders) { + public String initialisePointInTime(final String index, final String keepAlive, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - final Map params = new HashMap<>() {{ - if (StringUtils.isNotBlank(keepAlive)) { - put("keep_alive", keepAlive); - } - }}; + final Map params = new HashMap<>(); + if (StringUtils.isNotBlank(keepAlive)) { + params.putIfAbsent("keep_alive", keepAlive); + } + final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); endpoint.append("/_pit"); - final Response response = performRequest("POST", endpoint.toString(), params, requestHeaders, null); + final Response response = performRequest("POST", endpoint.toString(), + // ensure only keep_alive param provided as required by the API + params, elasticsearchRequestOptions == null ? Collections.emptyMap() : elasticsearchRequestOptions.getRequestHeaders(), null); final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); parseResponseWarningHeaders(response); @@ -923,12 +929,14 @@ public String initialisePointInTime(final String index, final String keepAlive, } @Override - public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { + public DeleteOperationResponse deletePointInTime(final String pitId, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final HttpEntity pitEntity = new NStringEntity(String.format("{\"id\": \"%s\"}", pitId), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(true); - final Response response = performRequest("DELETE", "/_pit", Collections.emptyMap(), requestHeaders, pitEntity); + final Response response = performRequest("DELETE", "/_pit", + // ensure no request params, which would break the API call + Collections.emptyMap(), elasticsearchRequestOptions == null ? Collections.emptyMap() : elasticsearchRequestOptions.getRequestHeaders(), pitEntity); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -949,12 +957,14 @@ public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { + public DeleteOperationResponse deleteScroll(final String scrollId, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { final HttpEntity scrollBody = new NStringEntity(String.format("{\"scroll_id\": \"%s\"}", scrollId), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(true); - final Response response = performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), requestHeaders, scrollBody); + final Response response = performRequest("DELETE", "/_search/scroll", + // ensure no request params, which would break the API call + Collections.emptyMap(), elasticsearchRequestOptions == null ? Collections.emptyMap() : elasticsearchRequestOptions.getRequestHeaders(), scrollBody); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -1043,6 +1053,16 @@ private Request addRequestHeaders(final Request request, final Map parameters, final Map headers, final HttpEntity entity) throws IOException { final Request baseRequest = new Request(method, endpoint); final Request request = addRequestHeaders(baseRequest, headers); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java index 11bdd911af02..b9f0d7dd23c0 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java @@ -201,7 +201,7 @@ private Record getById(final String _id, final Map context) thro final String json = mapper.writeValueAsString(query); - final SearchResponse response = clientService.search(json, index, type, null, null); + final SearchResponse response = clientService.search(json, index, type, null); if (response.getNumberOfHits() > 1) { throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s", response.getNumberOfHits(), json)); @@ -252,7 +252,7 @@ private Record getByQuery(final Map query, final Map lookup(final Map coordinates) throws LookupFailureException { try { final String id = (String) coordinates.get(ID); - final Map enums = esClient.get(index, type, id, null, null); + final Map enums = esClient.get(index, type, id, null); if (enums == null) { return Optional.empty(); } else { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java index f47df165a167..a150e71b9548 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java @@ -44,62 +44,62 @@ public List verify(ConfigurationContext context, Compo } @Override - public IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters, Map requestHeaders) { + public IndexOperationResponse add(IndexOperationRequest operation, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public IndexOperationResponse bulk(List operations, Map requestParameters, Map requestHeaders) { + public IndexOperationResponse bulk(List operations, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public Long count(String query, String index, String type, Map requestParameters, Map requestHeaders) { + public Long count(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters, Map requestHeaders) { + public DeleteOperationResponse deleteById(String index, String type, String id, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters, Map requestHeaders) { + public DeleteOperationResponse deleteById(String index, String type, List ids, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders) { + public DeleteOperationResponse deleteByQuery(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders) { + public UpdateOperationResponse updateByQuery(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public void refresh(final String index, final Map requestParameters, Map requestHeaders) { + public void refresh(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { // intentionally blank } @Override - public boolean exists(final String index, final Map requestParameters, Map requestHeaders) { + public boolean exists(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return true; } @Override - public boolean documentExists(String index, String type, String id, Map requestParameters, Map requestHeaders) { + public boolean documentExists(String index, String type, String id, ElasticsearchRequestOptions elasticsearchRequestOptions) { return true; } @Override - public Map get(String index, String type, String id, Map requestParameters, Map requestHeaders) { + public Map get(String index, String type, String id, ElasticsearchRequestOptions elasticsearchRequestOptions) { return data; } @Override - public SearchResponse search(String query, String index, String type, Map requestParameters, Map requestHeaders) { + public SearchResponse search(String query, String index, String type, ElasticsearchRequestOptions elasticsearchRequestOptions) { List> hits = new ArrayList<>(); Map source = new HashMap<>(); source.put("_source", data); @@ -109,22 +109,22 @@ public SearchResponse search(String query, String index, String type, Map requestHeaders) { - return search(null, null, null, null, requestHeaders); + public SearchResponse scroll(String scroll, ElasticsearchRequestOptions elasticsearchRequestOptions) { + return search(null, null, null, elasticsearchRequestOptions); } @Override - public String initialisePointInTime(String index, String keepAlive, Map requestHeaders) { + public String initialisePointInTime(String index, String keepAlive, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deletePointInTime(String pitId, Map requestHeaders) { + public DeleteOperationResponse deletePointInTime(String pitId, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteScroll(String scrollId, Map requestHeaders) { + public DeleteOperationResponse deleteScroll(String scrollId, ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java index 3fd1da483252..434c830de7a3 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java @@ -59,7 +59,7 @@ void before() throws Exception { runner.enableControllerService(service); - service.refresh(null, null, null); + service.refresh(null, null); } @AfterAll diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java index a6ea9d988894..72a8d7bfd7e5 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java @@ -31,6 +31,7 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl; import org.apache.nifi.elasticsearch.ElasticsearchException; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.IndexOperationRequest; import org.apache.nifi.elasticsearch.IndexOperationResponse; import org.apache.nifi.elasticsearch.MapBuilder; @@ -247,7 +248,7 @@ private void assertBasicSearch(final Map requestParameters, fina final String query = prettyJson(temp); - final SearchResponse response = service.search(query, "messages", type, requestParameters, requestHeaders); + final SearchResponse response = service.search(query, "messages", type, new ElasticsearchRequestOptions(requestParameters, requestHeaders)); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -274,16 +275,15 @@ private void assertBasicSearch(final Map requestParameters, fina } @Test - void testRunasUserUnauthorized() throws Exception { + void testRunAsUserUnauthorized() throws Exception { final Map temp = new MapBuilder() .of("size", 0, "query", new MapBuilder().of("match_all", new HashMap<>()).build()).build(); final String query = prettyJson(temp); final Map headers = Map.of("es-security-runas-user", "test-user"); - - + final ElasticsearchRequestOptions elasticsearchRequestOptions = new ElasticsearchRequestOptions(null, headers); final ElasticsearchException exception = assertThrows(ElasticsearchException.class, - () -> service.search(query, "messages", type, null, headers)); + () -> service.search(query, "messages", type, elasticsearchRequestOptions)); assertInstanceOf(ResponseException.class, exception.getCause()); assertEquals(403, ((ResponseException) exception.getCause()).getResponse().getStatusLine().getStatusCode()); @@ -301,8 +301,8 @@ void testSearchEmptySource() throws Exception { final String query = prettyJson(temp); - final SearchResponse response = service.search(query, "messages", type, Map.of("_source", "not_exists"), - Map.of("ES-Client-Authentication", "sharedsecret foobar")); + final SearchResponse response = service.search(query, "messages", type, new ElasticsearchRequestOptions(Map.of("_source", "not_exists"), + Map.of("ES-Client-Authentication", "sharedsecret foobar"))); assertNotNull(response, "Response was null"); assertNotNull(response.getHits(), "Hits was null"); @@ -322,7 +322,7 @@ void testSearchNoSource() throws Exception { final String query = prettyJson(temp); - final SearchResponse response = service.search(query, "no_source", type, null, null); + final SearchResponse response = service.search(query, "no_source", type, null); assertNotNull(response, "Response was null"); assertNotNull(response.getHits(), "Hits was null"); @@ -343,7 +343,7 @@ void testV6SearchWarnings() throws JsonProcessingException { ).build()) .build()); final String type = "a-type"; - final SearchResponse response = service.search(query, INDEX, type, null, null); + final SearchResponse response = service.search(query, INDEX, type, null); assertFalse(response.getWarnings().isEmpty(), "Missing warnings"); } @@ -354,7 +354,7 @@ void testV7SearchWarnings() throws JsonProcessingException { .of("size", 1, "query", new MapBuilder().of("match_all", new HashMap<>()).build()) .build()); final String type = "a-type"; - final SearchResponse response = service.search(query, INDEX, type, null, null); + final SearchResponse response = service.search(query, INDEX, type, null); assertFalse(response.getWarnings().isEmpty(), "Missing warnings"); } @@ -379,7 +379,7 @@ void testScroll() throws JsonProcessingException { .build()); // initiate the scroll - final SearchResponse response = service.search(query, INDEX, type, Collections.singletonMap("scroll", "10s"), null); + final SearchResponse response = service.search(query, INDEX, type, new ElasticsearchRequestOptions(Collections.singletonMap("scroll", "10s"), null)); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -399,7 +399,7 @@ void testScroll() throws JsonProcessingException { // scroll the next page final Map parameters = Map.of("scroll_id", response.getScrollId(), "scroll", "10s"); - final SearchResponse scrollResponse = service.scroll(prettyJson(parameters), Map.of("Accept", "application/json")); + final SearchResponse scrollResponse = service.scroll(prettyJson(parameters), new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); assertNotNull(scrollResponse, "Scroll Response was null"); assertEquals(15, scrollResponse.getNumberOfHits(), "Wrong count"); @@ -415,11 +415,11 @@ void testScroll() throws JsonProcessingException { assertNotEquals(scrollResponse.getHits(), response.getHits(), "Same results"); // delete the scroll - DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.getScrollId(), Map.of("Accept", "application/json")); + DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.getScrollId(), new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); assertNotNull(deleteResponse, "Delete Response was null"); // delete scroll again (should now be unknown but the 404 caught and ignored) - deleteResponse = service.deleteScroll(scrollResponse.getScrollId(), Map.of("Accept", "application/json")); + deleteResponse = service.deleteScroll(scrollResponse.getScrollId(), new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); assertNotNull(deleteResponse, "Delete Response was null"); } @@ -440,7 +440,7 @@ void testSearchAfter() throws JsonProcessingException { final String query = prettyJson(queryMap); // search first page - final SearchResponse response = service.search(query, INDEX, type, null, null); + final SearchResponse response = service.search(query, INDEX, type, null); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -463,7 +463,7 @@ void testSearchAfter() throws JsonProcessingException { page2QueryMap.put("search_after", MAPPER.readValue(response.getSearchAfter(), List.class)); page2QueryMap.remove("aggs"); final String secondPage = prettyJson(page2QueryMap); - final SearchResponse secondResponse = service.search(secondPage, INDEX, type, null, null); + final SearchResponse secondResponse = service.search(secondPage, INDEX, type, null); assertNotNull(secondResponse, "Second Response was null"); assertEquals(15, secondResponse.getNumberOfHits(), "Wrong count"); @@ -487,7 +487,7 @@ void testPointInTime() throws JsonProcessingException { assumeTrue(majorVersion >= 8 || (majorVersion == 7 && minorVersion >= 10), "Requires version 7.10+"); // initialise - final String pitId = service.initialisePointInTime(INDEX, "10s", Map.of("Accept", "application/json")); + final String pitId = service.initialisePointInTime(INDEX, "10s", new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); final Map queryMap = new MapBuilder() .of("size", 2, "query", new MapBuilder().of("match_all", new HashMap<>()).build()) @@ -506,7 +506,7 @@ void testPointInTime() throws JsonProcessingException { final String query = prettyJson(queryMap); // search first page - final SearchResponse response = service.search(query, null, type, null, null); + final SearchResponse response = service.search(query, null, type, new ElasticsearchRequestOptions()); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -529,7 +529,7 @@ void testPointInTime() throws JsonProcessingException { page2QueryMap.put("search_after", MAPPER.readValue(response.getSearchAfter(), List.class)); page2QueryMap.remove("aggs"); final String secondPage = prettyJson(page2QueryMap); - final SearchResponse secondResponse = service.search(secondPage, null, type, null, null); + final SearchResponse secondResponse = service.search(secondPage, null, type, null); assertNotNull(secondResponse, "Second Response was null"); assertEquals(15, secondResponse.getNumberOfHits(), "Wrong count"); @@ -559,7 +559,7 @@ void testDeleteByQuery() throws Exception { .of("query", new MapBuilder() .of("match", new MapBuilder().of("msg", "five").build()) .build()).build()); - final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, null, Map.of("Accept", "application/json")); + final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); assertNotNull(response); } @@ -571,7 +571,7 @@ void testDeleteByQueryRequestParameters() throws Exception { .build()).build()); final Map parameters = new HashMap<>(); parameters.put("refresh", "true"); - final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, parameters, Map.of("Accept", "application/json")); + final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, new ElasticsearchRequestOptions(parameters, Map.of("Accept", "application/json"))); assertNotNull(response); } @@ -581,7 +581,7 @@ void testUpdateByQuery() throws Exception { .of("query", new MapBuilder() .of("match", new MapBuilder().of("msg", "four").build()) .build()).build()); - final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, null, null); + final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, null); assertNotNull(response); } @@ -594,25 +594,26 @@ void testUpdateByQueryRequestParameters() throws Exception { final Map parameters = new HashMap<>(); parameters.put("refresh", "true"); parameters.put("slices", "1"); - final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, parameters, Map.of("Accept", "application/json")); + final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, new ElasticsearchRequestOptions(parameters, Map.of("Accept", "application/json"))); assertNotNull(response); } @Test void testDeleteById() throws Exception { final String ID = "1"; - final Map originalDoc = service.get(INDEX, type, ID, null, Map.of("Accept", "application/json")); + final Map originalDoc = service.get(INDEX, type, ID, new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); try { - final DeleteOperationResponse response = service.deleteById(INDEX, type, ID, null, null); + final DeleteOperationResponse response = service.deleteById(INDEX, type, ID, null); assertNotNull(response); final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> - service.get(INDEX, type, ID, null, null)); + service.get(INDEX, type, ID, null)); assertTrue(ee.isNotFound()); - final Map doc = service.get(INDEX, type, "2", null, null); + final Map doc = service.get(INDEX, type, "2", new ElasticsearchRequestOptions()); assertNotNull(doc); } finally { // replace the deleted doc - service.add(new IndexOperationRequest(INDEX, type, "1", originalDoc, IndexOperationRequest.Operation.Index, null, false, null, null), null, Map.of("Accept", "application/json")); + service.add(new IndexOperationRequest(INDEX, type, "1", originalDoc, IndexOperationRequest.Operation.Index, null, false, null, null), + new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); waitForIndexRefresh(); // (affects later tests using _search or _bulk) } } @@ -621,7 +622,7 @@ void testDeleteById() throws Exception { void testGet() { for (int index = 1; index <= 15; index++) { final String id = String.valueOf(index); - final Map doc = service.get(INDEX, type, id, null, null); + final Map doc = service.get(INDEX, type, id, null); assertNotNull(doc, "Doc was null"); assertNotNull(doc.get("msg"), "${doc.toString()}\t${doc.keySet().toString()}"); } @@ -629,27 +630,27 @@ void testGet() { @Test void testGetEmptySource() { - final Map doc = service.get(INDEX, type, "1", Collections.singletonMap("_source", "not_exist"), null); + final Map doc = service.get(INDEX, type, "1", new ElasticsearchRequestOptions(Collections.singletonMap("_source", "not_exist"), null)); assertNotNull(doc, "Doc was null"); assertTrue(doc.isEmpty(), "Doc was not empty"); } @Test void testGetNoSource() { - final Map doc = service.get("no_source", type, "1", null, Map.of("Accept", "application/json")); + final Map doc = service.get("no_source", type, "1", new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); assertNotNull(doc, "Doc was null"); assertTrue(doc.isEmpty(), "Doc was not empty"); } @Test void testGetNotFound() { - final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, type, "not_found", null, null)); + final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, type, "not_found", null)); assertTrue(ee.isNotFound()); } @Test void testExists() { - assertTrue(service.exists(INDEX, null, null), "index does not exist"); - assertFalse(service.exists("index-does-not-exist", null, Map.of("Accept", "application/json")), "index exists"); + assertTrue(service.exists(INDEX, null), "index does not exist"); + assertFalse(service.exists("index-does-not-exist", new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))), "index exists"); } @Test @@ -659,7 +660,7 @@ void testCompression() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null, null), "index does not exist"); + assertTrue(service.exists(INDEX, new ElasticsearchRequestOptions(null, null)), "index does not exist"); } @Test @@ -669,7 +670,7 @@ void testNoMetaHeader() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null, null), "index does not exist"); + assertTrue(service.exists(INDEX, null), "index does not exist"); } @Test @@ -679,7 +680,7 @@ void testStrictDeprecation() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null, null), "index does not exist"); + assertTrue(service.exists(INDEX, null), "index does not exist"); } @Test @@ -689,7 +690,7 @@ void testNodeSelector() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null, null), "index does not exist"); + assertTrue(service.exists(INDEX, null), "index does not exist"); } @Test @@ -700,7 +701,7 @@ void testRestClientRequestHeaders() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null, null), "index does not exist"); + assertTrue(service.exists(INDEX, null), "index does not exist"); } @Test @@ -714,7 +715,7 @@ void testSniffer() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null, null), "index does not exist"); + assertTrue(service.exists(INDEX, null), "index does not exist"); } @Test @@ -733,22 +734,21 @@ void testNullSuppression() throws InterruptedException { Collections.singletonList( new IndexOperationRequest("nulls", type, "1", doc, IndexOperationRequest.Operation.Index, null, false, null, null) ), - null, - Map.of("Accept", "application/json") + new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json")) ); assertNotNull(response); waitForIndexRefresh(); - Map result = service.get("nulls", type, "1", null, null); + Map result = service.get("nulls", type, "1", null); assertEquals(doc, result); // suppress nulls suppressNulls(true); - response = service.bulk(Collections.singletonList(new IndexOperationRequest("nulls", type, "2", doc, IndexOperationRequest.Operation.Index, null, false, null, null)), null, null); + response = service.bulk(Collections.singletonList(new IndexOperationRequest("nulls", type, "2", doc, IndexOperationRequest.Operation.Index, null, false, null, null)), null); assertNotNull(response); waitForIndexRefresh(); - result = service.get("nulls", type, "2", null, null); + result = service.get("nulls", type, "2", null); assertTrue(result.keySet().containsAll(Arrays.asList("msg", "is_blank")), "Non-nulls (present): " + result); assertFalse(result.containsKey("is_null"), "is_null (should be omitted): " + result); assertFalse(result.containsKey("is_empty"), "is_empty (should be omitted): " + result); @@ -779,7 +779,7 @@ void testBulkAddTwoIndexes() throws Exception { put("msg", "test"); }}, IndexOperationRequest.Operation.Index, null, false, null, null)); } - final IndexOperationResponse response = service.bulk(payload, Map.of("refresh", "true"), null); + final IndexOperationResponse response = service.bulk(payload, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); assertNotNull(response); waitForIndexRefresh(); @@ -787,9 +787,9 @@ void testBulkAddTwoIndexes() throws Exception { * Now, check to ensure that both indexes got populated appropriately. */ final String query = "{ \"query\": { \"match_all\": {}}}"; - final Long indexA = service.count(query, "bulk_a", type, null, Map.of("Accept", "application/json")); - final Long indexB = service.count(query, "bulk_b", type, null, null); - final Long indexC = service.count(query, "bulk_c", type, null, null); + final Long indexA = service.count(query, "bulk_a", type, new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))); + final Long indexB = service.count(query, "bulk_b", type, null); + final Long indexC = service.count(query, "bulk_c", type, null); assertNotNull(indexA); assertNotNull(indexB); @@ -799,7 +799,7 @@ void testBulkAddTwoIndexes() throws Exception { assertEquals(10, indexB.intValue()); assertEquals(5, indexC.intValue()); - final Long total = service.count(query, "bulk_a,bulk_b,bulk_c", type, null, null); + final Long total = service.count(query, "bulk_a,bulk_b,bulk_c", type, null); assertNotNull(total); assertEquals(25, total.intValue()); } @@ -816,16 +816,16 @@ void testBulkRequestParametersAndBulkHeaders() { payload.add(new IndexOperationRequest("bulk_c", type, String.valueOf(x), new MapBuilder().of("msg", "test").build(), IndexOperationRequest.Operation.Index, null, false, null, null)); } - final IndexOperationResponse response = service.bulk(payload, Map.of("refresh", "true"), null); + final IndexOperationResponse response = service.bulk(payload, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); assertNotNull(response); /* * Now, check to ensure that all indices got populated and refreshed appropriately. */ final String query = "{ \"query\": { \"match_all\": {}}}"; - final Long indexA = service.count(query, "bulk_a", type, null, null); - final Long indexB = service.count(query, "bulk_b", type, null, null); - final Long indexC = service.count(query, "bulk_c", type, null, null); + final Long indexA = service.count(query, "bulk_a", type, null); + final Long indexB = service.count(query, "bulk_b", type, null); + final Long indexC = service.count(query, "bulk_c", type, null); assertNotNull(indexA); assertNotNull(indexB); @@ -835,7 +835,7 @@ void testBulkRequestParametersAndBulkHeaders() { assertEquals(10, indexB.intValue()); assertEquals(5, indexC.intValue()); - final Long total = service.count(query, "bulk_*", type, null, null); + final Long total = service.count(query, "bulk_*", type, null); assertNotNull(total); assertEquals(25, total.intValue()); } @@ -844,7 +844,7 @@ void testBulkRequestParametersAndBulkHeaders() { void testUnknownBulkHeader() { final IndexOperationRequest failingRequest = new IndexOperationRequest("bulk_c", type, "1", new MapBuilder().of("msg", "test").build(), IndexOperationRequest.Operation.Index, null, false, null, Collections.singletonMap("not_exist", "true")); - final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.add(failingRequest, null, null)); + final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.add(failingRequest, null)); assertInstanceOf(ResponseException.class, ee.getCause()); assertTrue(ee.getCause().getMessage().contains("Action/metadata line [1] contains an unknown parameter [not_exist]")); } @@ -858,7 +858,7 @@ void testDynamicTemplates() { IndexOperationRequest.Operation.Index, null, false, new MapBuilder().of("hello", "test_text").build(), null) ); - final IndexOperationResponse response = service.bulk(payload, Map.of("refresh", "true"), null); + final IndexOperationResponse response = service.bulk(payload, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); assertNotNull(response); /* @@ -879,8 +879,9 @@ void testUpdateAndUpsert() throws InterruptedException { final Map doc = new HashMap<>(); doc.put("msg", "Buongiorno, mondo"); doc.put("counter", 1); - service.add(new IndexOperationRequest(INDEX, type, TEST_ID, doc, IndexOperationRequest.Operation.Index, null, false, null, null), Map.of("refresh", "true"), null); - Map result = service.get(INDEX, type, TEST_ID, null, null); + service.add(new IndexOperationRequest(INDEX, type, TEST_ID, doc, IndexOperationRequest.Operation.Index, null, false, null, null), + new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + Map result = service.get(INDEX, type, TEST_ID, null); assertEquals(doc, result, "Not the same"); final Map updates = new HashMap<>(); @@ -889,8 +890,8 @@ void testUpdateAndUpsert() throws InterruptedException { merged.putAll(updates); merged.putAll(doc); IndexOperationRequest request = new IndexOperationRequest(INDEX, type, TEST_ID, updates, IndexOperationRequest.Operation.Update, null, false, null, null); - service.add(request, Map.of("refresh", "true"), null); - result = service.get(INDEX, type, TEST_ID, null, null); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + result = service.get(INDEX, type, TEST_ID, null); assertTrue(result.containsKey("from")); assertTrue(result.containsKey("counter")); assertTrue(result.containsKey("msg")); @@ -901,8 +902,8 @@ void testUpdateAndUpsert() throws InterruptedException { upsertItems.put("upsert_2", 1); upsertItems.put("upsert_3", true); request = new IndexOperationRequest(INDEX, type, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert, null, false, null, null); - service.add(request, Map.of("refresh", "true"), null); - result = service.get(INDEX, type, UPSERTED_ID, null, null); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + result = service.get(INDEX, type, UPSERTED_ID, null); assertEquals(upsertItems, result); final Map upsertDoc = new HashMap<>(); @@ -913,14 +914,14 @@ void testUpdateAndUpsert() throws InterruptedException { script.put("params", Collections.singletonMap("count", 2)); // apply script to existing document request = new IndexOperationRequest(INDEX, type, TEST_ID, upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null); - service.add(request, Map.of("refresh", "true"), null); - result = service.get(INDEX, type, TEST_ID, null, null); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + result = service.get(INDEX, type, TEST_ID, new ElasticsearchRequestOptions()); assertEquals(doc.get("msg"), result.get("msg")); assertEquals(3, result.get("counter")); // index document that doesn't already exist (don't apply script) request = new IndexOperationRequest(INDEX, type, UPSERT_SCRIPT_ID, upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null); - service.add(request, Map.of("refresh", "true"), null); - result = service.get(INDEX, type, UPSERT_SCRIPT_ID, null, null); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + result = service.get(INDEX, type, UPSERT_SCRIPT_ID, new ElasticsearchRequestOptions(null, null)); assertNull(result.get("counter")); assertEquals(upsertDoc, result); @@ -931,23 +932,23 @@ void testUpdateAndUpsert() throws InterruptedException { upsertScript.put("params", Collections.singletonMap("count", 2)); // no script execution if doc found (without scripted_upsert) request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, false, null, null); - service.add(request, Map.of("refresh", "true"), null); - assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null, Map.of("Accept", "application/json"))); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json")))); // script execution with no doc found (with scripted_upsert) - doc not create, no "upsert" doc provided (empty objects suppressed) suppressNulls(true); request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, null, null); - service.add(request, Map.of("refresh", "true"), null); - assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null, null)); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null)); // script execution with no doc found (with scripted_upsert) - doc created, empty "upsert" doc provided suppressNulls(false); request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, null, null); - service.add(request, Map.of("refresh", "true"), null); - result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, null, null); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, new ElasticsearchRequestOptions(null, null)); assertEquals(2, result.get("counter")); // script execution with no doc found (with scripted_upsert) - doc updated request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, null, null); - service.add(request, Map.of("refresh", "true"), null); - result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, null, null); + service.add(request, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); + result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, new ElasticsearchRequestOptions()); assertEquals(4, result.get("counter")); } finally { final List deletes = new ArrayList<>(); @@ -955,12 +956,12 @@ void testUpdateAndUpsert() throws InterruptedException { deletes.add(new IndexOperationRequest(INDEX, type, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete, null, false, null, null)); deletes.add(new IndexOperationRequest(INDEX, type, UPSERT_SCRIPT_ID, null, IndexOperationRequest.Operation.Delete, null, false, null, null)); deletes.add(new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, null, IndexOperationRequest.Operation.Delete, null, false, null, null)); - assertFalse(service.bulk(deletes, Map.of("refresh", "true"), null).hasErrors()); + assertFalse(service.bulk(deletes, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)).hasErrors()); waitForIndexRefresh(); // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk) - assertFalse(service.documentExists(INDEX, type, TEST_ID, null, null)); - assertFalse(service.documentExists(INDEX, type, UPSERTED_ID, null, null)); - assertFalse(service.documentExists(INDEX, type, UPSERT_SCRIPT_ID, null, null)); - assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null, null)); + assertFalse(service.documentExists(INDEX, type, TEST_ID, null)); + assertFalse(service.documentExists(INDEX, type, UPSERTED_ID, null)); + assertFalse(service.documentExists(INDEX, type, UPSERT_SCRIPT_ID, null)); + assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null)); } } @@ -975,7 +976,7 @@ void testGetBulkResponsesWithErrors() { new IndexOperationRequest(INDEX, type, "1", new MapBuilder().of("msg", "one", "intField", "notaninteger").build(), IndexOperationRequest.Operation.Index, null, false, null, null) // can't parse int field ); - final IndexOperationResponse response = service.bulk(ops, Map.of("refresh", "true"), null); + final IndexOperationResponse response = service.bulk(ops, new ElasticsearchRequestOptions(Map.of("refresh", "true"), null)); assertTrue(response.hasErrors()); assertEquals(2, response.getItems().stream().filter(it -> { final Optional first = it.keySet().stream().findFirst(); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java index c63859d856c6..fcbf3d324540 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.ElasticsearchException; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.OperationResponse; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -71,7 +72,7 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp abstract OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, final String index, final String type, - final Map requestParameters, final Map requestHeaders); + final ElasticsearchRequestOptions elasticsearchRequestOptions); @Override public Set getRelationships() { @@ -132,7 +133,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session : null; final OperationResponse or = performOperation(clientService.get(), query, index, type, - getRequestParametersFromDynamicProperties(context, input), getRequestHeadersFromDynamicProperties(context, input)); + new ElasticsearchRequestOptions(getRequestParametersFromDynamicProperties(context, input), getRequestHeadersFromDynamicProperties(context, input))); if (input == null) { input = session.create(); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java index 99ff0ef2b2df..d479e77c42a4 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.SearchResponse; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -111,7 +112,7 @@ SearchResponse doQuery(final PaginatedJsonQueryParameters paginatedJsonQueryPara if (!requestParameters.isEmpty()) { getLogger().warn("Elasticsearch _scroll API does not accept query parameters, ignoring dynamic properties {}", requestParameters.keySet()); } - response = clientService.get().scroll(queryJson, requestHeaders); + response = clientService.get().scroll(queryJson, new ElasticsearchRequestOptions(null, requestHeaders)); } else { if (paginationType == PaginationType.SCROLL) { requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive()); @@ -122,8 +123,7 @@ SearchResponse doQuery(final PaginatedJsonQueryParameters paginatedJsonQueryPara // Point in Time uses general /_search API not /index/_search paginationType == PaginationType.POINT_IN_TIME ? null : paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getType(), - requestParameters, - requestHeaders + new ElasticsearchRequestOptions(requestParameters, requestHeaders) ); paginatedJsonQueryParameters.setPitId(response.getPitId()); paginatedJsonQueryParameters.setSearchAfter(response.getSearchAfter()); @@ -202,7 +202,9 @@ private String updateQueryJson(final boolean newQuery, final PaginatedJsonQueryP // add pit_id to query JSON final String queryPitId = newQuery ? clientService.get().initialisePointInTime( - paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive(), getRequestHeadersFromDynamicProperties(context, input)) + paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive(), + new ElasticsearchRequestOptions(null, getRequestHeadersFromDynamicProperties(context, input)) + ) : paginatedJsonQueryParameters.getPitId(); final ObjectNode pit = JsonNodeFactory.instance.objectNode().put("id", queryPitId); @@ -296,13 +298,13 @@ void clearElasticsearchState(final ProcessContext context, final SearchResponse final String scrollId = getScrollId(context, response); if (StringUtils.isNotBlank(scrollId)) { - clientService.get().deleteScroll(scrollId, requestHeaders); + clientService.get().deleteScroll(scrollId, new ElasticsearchRequestOptions(null, requestHeaders)); } } else if (paginationType == PaginationType.POINT_IN_TIME) { final String pitId = getPitId(context, response); if (StringUtils.isNotBlank(pitId)) { - clientService.get().deletePointInTime(pitId, requestHeaders); + clientService.get().deletePointInTime(pitId, new ElasticsearchRequestOptions(null, requestHeaders)); } } } catch (final Exception ex) { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java index d545de88162e..07f61f58e8fd 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java @@ -26,11 +26,11 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.OperationResponse; import org.apache.nifi.expression.ExpressionLanguageScope; import java.util.List; -import java.util.Map; @WritesAttributes({ @WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."), @@ -79,8 +79,7 @@ String getErrorAttribute() { @Override OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, - final String index, final String type, final Map requestParameters, - final Map requestHeaders) { - return clientService.deleteByQuery(query, index, type, requestParameters, requestHeaders); + final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { + return clientService.deleteByQuery(query, index, type, elasticsearchRequestOptions); } } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java index 46e06aadcebb..44b2ae69db4f 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java @@ -28,6 +28,7 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.ElasticsearchException; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.SearchResponse; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -373,7 +374,8 @@ default List verify(final ProcessContext context, fina if (context.getProperty(INDEX).isSet()) { index = context.getProperty(INDEX).evaluateAttributeExpressions(attributes).getValue(); try { - if (verifyClientService.exists(index, getRequestParametersFromDynamicProperties(context, attributes), getRequestHeadersFromDynamicProperties(context, attributes))) { + if (verifyClientService.exists(index, new ElasticsearchRequestOptions(getRequestParametersFromDynamicProperties(context, attributes), + getRequestHeadersFromDynamicProperties(context, attributes)))) { indexExistsResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Index [%s] exists", index)); indexExists = true; @@ -432,7 +434,7 @@ default List verifyAfterIndex(final ProcessContext con requestParameters.putIfAbsent("_source", "false"); final SearchResponse response = verifyClientService.search( - mapper.writeValueAsString(queryJson), index, type, requestParameters, getRequestHeadersFromDynamicProperties(context, attributes)); + mapper.writeValueAsString(queryJson), index, type, new ElasticsearchRequestOptions(requestParameters, getRequestHeadersFromDynamicProperties(context, attributes))); queryValidResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Query found %d hits and %d aggregations in %d milliseconds, timed out: %s", response.getNumberOfHits(), response.getAggregations() == null ? 0 : response.getAggregations().size(), response.getTook(), response.isTimedOut())); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java index 2b5036bf1020..d0482d0508e3 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java @@ -33,6 +33,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.ElasticsearchException; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -196,7 +197,8 @@ public List verifyAfterIndex(final ProcessContext cont try { final Map requestParameters = new HashMap<>(getRequestParametersFromDynamicProperties(context, attributes)); requestParameters.putIfAbsent("_source", "false"); - if (verifyClientService.documentExists(index, type, id, requestParameters, getRequestHeadersFromDynamicProperties(context, attributes))) { + if (verifyClientService.documentExists(index, type, id, new ElasticsearchRequestOptions(requestParameters, + getRequestHeadersFromDynamicProperties(context, attributes)))) { documentExistsResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Document [%s] exists in index [%s]", id, index)); } else { @@ -245,7 +247,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final StopWatch stopWatch = new StopWatch(true); final Map doc = clientService.get().get(index, type, id, - getRequestParametersFromDynamicProperties(context, input), getRequestHeadersFromDynamicProperties(context, input)); + new ElasticsearchRequestOptions(getRequestParametersFromDynamicProperties(context, input), getRequestHeadersFromDynamicProperties(context, input))); final Map attributes = new HashMap<>(4, 1); attributes.put("filename", id); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java index 6e993505bb27..2c914918413f 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java @@ -24,6 +24,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.SearchResponse; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -82,8 +83,7 @@ SearchResponse doQuery(final JsonQueryParameters queryJsonParameters, final List queryJsonParameters.getQuery(), queryJsonParameters.getIndex(), queryJsonParameters.getType(), - getRequestParametersFromDynamicProperties(context, input), - getRequestHeadersFromDynamicProperties(context, input) + new ElasticsearchRequestOptions(getRequestParametersFromDynamicProperties(context, input), getRequestHeadersFromDynamicProperties(context, input)) ); if (input != null) { session.getProvenanceReporter().send( diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java index 574ed25f0754..4c1d89145490 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java @@ -33,6 +33,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.ElasticsearchException; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.IndexOperationRequest; import org.apache.nifi.elasticsearch.IndexOperationResponse; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -281,8 +282,8 @@ private Map getMapFromAttribute(final PropertyDescriptor propert private List indexDocuments(final List operations, final List originals, final ProcessContext context, final ProcessSession session) throws IOException { final Map dynamicProperties = getRequestParametersFromDynamicProperties(context, originals.getFirst()); - final IndexOperationResponse response = clientService.get().bulk(operations, getRequestURLParameters(dynamicProperties), - getRequestHeadersFromDynamicProperties(context, originals.getFirst())); + final IndexOperationResponse response = clientService.get().bulk(operations, + new ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), getRequestHeadersFromDynamicProperties(context, originals.getFirst()))); final Map> errors = findElasticsearchResponseErrors(response); final List errorDocuments = new ArrayList<>(errors.size()); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java index 5816a859e437..c2dc769de28d 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java @@ -33,6 +33,7 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.ElasticsearchException; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.IndexOperationRequest; import org.apache.nifi.elasticsearch.IndexOperationResponse; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -531,7 +532,7 @@ private void removeResultRecordFlowFiles(final List results, final Pro private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters, final int batch) throws IOException, SchemaNotFoundException, MalformedRecordException { - final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), indexOperationParameters.getRequestParameters(), indexOperationParameters.getRequestHeaders()); + final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), indexOperationParameters.getElasticsearchRequestOptions()); final Map> errors = findElasticsearchResponseErrors(response); if (!errors.isEmpty()) { @@ -856,8 +857,7 @@ private class IndexOperationParameters { private final RecordPath scriptedUpsertPath; private final RecordPath dynamicTypesPath; - private final Map requestHeaders; - private final Map requestParameters; + private final ElasticsearchRequestOptions elasticsearchRequestOptions; private final Map bulkHeaderRecordPaths; private final boolean retainId; @@ -879,10 +879,8 @@ private class IndexOperationParameters { scriptedUpsertPath = compileRecordPathFromProperty(context, SCRIPTED_UPSERT_RECORD_PATH, input); dynamicTypesPath = compileRecordPathFromProperty(context, DYNAMIC_TEMPLATES_RECORD_PATH, input); - requestHeaders = getRequestHeadersFromDynamicProperties(context, input); - final Map dynamicProperties = getRequestParametersFromDynamicProperties(context, input); - requestParameters = getRequestURLParameters(dynamicProperties); + elasticsearchRequestOptions = new ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), getRequestHeadersFromDynamicProperties(context, input)); final Map bulkHeaderParameterPaths = getBulkHeaderParameters(dynamicProperties); bulkHeaderRecordPaths = new HashMap<>(bulkHeaderParameterPaths.size(), 1); @@ -953,12 +951,8 @@ public RecordPath getDynamicTypesPath() { return dynamicTypesPath; } - public Map getRequestHeaders() { - return requestHeaders; - } - - public Map getRequestParameters() { - return requestParameters; + public ElasticsearchRequestOptions getElasticsearchRequestOptions() { + return elasticsearchRequestOptions; } public Map getBulkHeaderRecordPaths() { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java index 2c36f3006788..72258a951d0a 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java @@ -25,11 +25,10 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.OperationResponse; import org.apache.nifi.expression.ExpressionLanguageScope; -import java.util.Map; - @WritesAttributes({ @WritesAttribute(attribute = "elasticsearch.update.took", description = "The amount of time that it took to complete the update operation in ms."), @WritesAttribute(attribute = "elasticsearch.update.error", description = "The error message provided by Elasticsearch if there is an error running the update.") @@ -70,8 +69,7 @@ String getErrorAttribute() { @Override OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, - final String index, final String type, final Map requestParameters, - final Map requestHeaders) { - return clientService.updateByQuery(query, index, type, requestParameters, requestHeaders); + final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { + return clientService.updateByQuery(query, index, type, elasticsearchRequestOptions); } } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java index 50e907d05c40..78da85cd9759 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java @@ -172,8 +172,8 @@ public void testWithFlowfileInput() { postTest(runner, query); - assertTrue(client.getRequestParameters().isEmpty()); - assertTrue(client.getRequestHeaders().isEmpty()); + assertTrue(client.getElasticsearchRequestOptions().getRequestParameters().isEmpty()); + assertTrue(client.getElasticsearchRequestOptions().getRequestHeaders().isEmpty()); } @Test @@ -191,12 +191,12 @@ public void testWithFlowfileInputAndRequestParametersAndRequestHeaders() { postTest(runner, query); - assertEquals(2, client.getRequestParameters().size()); - assertEquals("true", client.getRequestParameters().get("refresh")); - assertEquals("auto", client.getRequestParameters().get("slices")); + assertEquals(2, client.getElasticsearchRequestOptions().getRequestParameters().size()); + assertEquals("true", client.getElasticsearchRequestOptions().getRequestParameters().get("refresh")); + assertEquals("auto", client.getElasticsearchRequestOptions().getRequestParameters().get("slices")); - assertEquals(1, client.getRequestHeaders().size()); - assertEquals("application/json", client.getRequestHeaders().get("Accept")); + assertEquals(1, client.getElasticsearchRequestOptions().getRequestHeaders().size()); + assertEquals("application/json", client.getElasticsearchRequestOptions().getRequestHeaders().get("Accept")); } @Test @@ -269,8 +269,8 @@ public void testNoInputHandling() { postTest(runner, query); - assertTrue(client.getRequestParameters().isEmpty()); - assertTrue(client.getRequestHeaders().isEmpty()); + assertTrue(client.getElasticsearchRequestOptions().getRequestParameters().isEmpty()); + assertTrue(client.getElasticsearchRequestOptions().getRequestHeaders().isEmpty()); } @Test @@ -291,12 +291,12 @@ public void testNoInputHandlingWithRequestParameters() { postTest(runner, query); - assertEquals(2, client.getRequestParameters().size()); - assertEquals("true", client.getRequestParameters().get("refresh")); - assertEquals("auto", client.getRequestParameters().get("slices")); + assertEquals(2, client.getElasticsearchRequestOptions().getRequestParameters().size()); + assertEquals("true", client.getElasticsearchRequestOptions().getRequestParameters().get("refresh")); + assertEquals("auto", client.getElasticsearchRequestOptions().getRequestParameters().get("slices")); - assertEquals(1, client.getRequestHeaders().size()); - assertEquals("application/json", client.getRequestHeaders().get("Accept")); + assertEquals(1, client.getElasticsearchRequestOptions().getRequestHeaders().size()); + assertEquals("application/json", client.getElasticsearchRequestOptions().getRequestHeaders().get("Accept")); } @ParameterizedTest diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java index 3f12d62b7007..f1b1da857067 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java @@ -465,17 +465,17 @@ void testRequestParameters() { final TestElasticsearchClientService service = getService(runner); if (runner.getProcessor() instanceof AbstractPaginatedJsonQueryElasticsearch) { - assertEquals(3, service.getRequestParameters().size()); - assertEquals("600s", service.getRequestParameters().get("scroll")); + assertEquals(3, service.getElasticsearchRequestOptions().getRequestParameters().size()); + assertEquals("600s", service.getElasticsearchRequestOptions().getRequestParameters().get("scroll")); } else { - assertEquals(2, service.getRequestParameters().size()); + assertEquals(2, service.getElasticsearchRequestOptions().getRequestParameters().size()); } - assertEquals("true", service.getRequestParameters().get("refresh")); - assertEquals("auto", service.getRequestParameters().get("slices")); + assertEquals("true", service.getElasticsearchRequestOptions().getRequestParameters().get("refresh")); + assertEquals("auto", service.getElasticsearchRequestOptions().getRequestParameters().get("slices")); - assertEquals(1, service.getRequestHeaders().size()); - assertEquals("application/json", service.getRequestHeaders().get("Accept")); + assertEquals(1, service.getElasticsearchRequestOptions().getRequestHeaders().size()); + assertEquals("application/json", service.getElasticsearchRequestOptions().getRequestHeaders().get("Accept")); } @ParameterizedTest diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java index 02130acd5972..e47aa804652e 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.java @@ -61,10 +61,12 @@ void testInvalidPaginationProperties() { runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, "not-enum"); final AssertionError assertionError = assertThrows(AssertionError.class, runner::run); - final String expected = String.format("Processor has 2 validation failures:\n" + - "'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" + - "'%s' validated against 'not-a-period' is invalid because Must be of format where " + - "is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n", + final String expected = String.format(""" + Processor has 2 validation failures: + '%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s' + '%s' validated against 'not-a-period' is invalid because Must be of format where \ + is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days + """, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), Stream.of(PaginationType.values()).map(PaginationType::getValue).collect(Collectors.joining(", ")), AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName()); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java index 3700438cc8be..ac3557fbf4a1 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java @@ -209,12 +209,12 @@ void testEmptyId() { runProcessor(runner); final TestElasticsearchClientService service = getService(runner); - assertEquals(2, service.getRequestParameters().size()); - assertEquals("true", service.getRequestParameters().get("refresh")); - assertEquals("msg", service.getRequestParameters().get("_source")); + assertEquals(2, service.getElasticsearchRequestOptions().getRequestParameters().size()); + assertEquals("true", service.getElasticsearchRequestOptions().getRequestParameters().get("refresh")); + assertEquals("msg", service.getElasticsearchRequestOptions().getRequestParameters().get("_source")); - assertEquals(1, service.getRequestHeaders().size()); - assertEquals("application/json", service.getRequestHeaders().get("Accept")); + assertEquals(1, service.getElasticsearchRequestOptions().getRequestHeaders().size()); + assertEquals("application/json", service.getElasticsearchRequestOptions().getRequestHeaders().get("Accept")); } private static void testCounts(final TestRunner runner, final int doc, final int failure, final int notFound) { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java index f43bfd4c6fbe..dada8d56456f 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java @@ -21,6 +21,7 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.elasticsearch.DeleteOperationResponse; import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.IndexOperationRequest; import org.apache.nifi.elasticsearch.IndexOperationResponse; import org.apache.nifi.elasticsearch.SearchResponse; @@ -58,8 +59,7 @@ public class TestElasticsearchClientService extends AbstractControllerService im private boolean throwErrorInUpdate; private int pageCount = 0; private int maxPages = 1; - private Map requestParameters; - private Map requestHeaders; + private ElasticsearchRequestOptions elasticsearchRequestOptions; private boolean scrolling = false; private String query; @@ -68,7 +68,7 @@ public TestElasticsearchClientService(final boolean returnAggs) { this.returnAggs = returnAggs; } - private void common(final boolean throwError, final Map requestParameters, final Map requestHeaders) throws IOException { + private void common(final boolean throwError, final ElasticsearchRequestOptions elasticsearchRequestOptions) throws IOException { if (throwError) { if (throwNotFoundInGet) { throw new MockElasticsearchException(false, true); @@ -77,8 +77,10 @@ private void common(final boolean throwError, final Map requestP } } - this.requestParameters = requestParameters; - this.requestHeaders = requestHeaders; + if (!scrolling && elasticsearchRequestOptions != null) { + // ignore the request options for scroll operations if there are request options already being tracked + this.elasticsearchRequestOptions = elasticsearchRequestOptions; + } } @Override @@ -87,14 +89,14 @@ public List verify(final ConfigurationContext context, } @Override - public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters, final Map requestHeaders) { - return bulk(Collections.singletonList(operation), requestParameters, requestHeaders); + public IndexOperationResponse add(final IndexOperationRequest operation, final ElasticsearchRequestOptions elasticsearchRequestOptions) { + return bulk(Collections.singletonList(operation), elasticsearchRequestOptions); } @Override - public IndexOperationResponse bulk(final List operations, final Map requestParameters, final Map requestHeaders) { + public IndexOperationResponse bulk(final List operations, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - common(false, requestParameters, requestHeaders); + common(false, elasticsearchRequestOptions); } catch (final IOException e) { throw new RuntimeException(e); } @@ -102,9 +104,9 @@ public IndexOperationResponse bulk(final List operations, } @Override - public Long count(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public Long count(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - common(false, requestParameters, requestHeaders); + common(false, elasticsearchRequestOptions); } catch (final IOException e) { throw new RuntimeException(e); } @@ -113,14 +115,14 @@ public Long count(final String query, final String index, final String type, fin } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { - return deleteById(index, type, Collections.singletonList(id), requestParameters, requestHeaders); + public DeleteOperationResponse deleteById(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { + return deleteById(index, type, Collections.singletonList(id), elasticsearchRequestOptions); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters, final Map requestHeaders) { + public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - common(throwErrorInDelete, requestParameters, requestHeaders); + common(throwErrorInDelete, elasticsearchRequestOptions); } catch (final IOException e) { throw new RuntimeException(e); } @@ -128,15 +130,15 @@ public DeleteOperationResponse deleteById(final String index, final String type, } @Override - public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { this.query = query; - return deleteById(index, type, Collections.singletonList("1"), requestParameters, requestHeaders); + return deleteById(index, type, Collections.singletonList("1"), elasticsearchRequestOptions); } @Override - public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - common(throwErrorInUpdate, requestParameters, requestHeaders); + common(throwErrorInUpdate, elasticsearchRequestOptions); } catch (final IOException e) { throw new RuntimeException(e); } @@ -145,24 +147,24 @@ public UpdateOperationResponse updateByQuery(final String query, final String in } @Override - public void refresh(final String index, final Map requestParameters, final Map requestHeaders) { + public void refresh(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { // intentionally blank } @Override - public boolean exists(final String index, final Map requestParameters, final Map requestHeaders) { + public boolean exists(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return true; } @Override - public boolean documentExists(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + public boolean documentExists(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return true; } @Override - public Map get(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + public Map get(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - common(throwErrorInGet || throwNotFoundInGet, requestParameters, requestHeaders); + common(throwErrorInGet || throwNotFoundInGet, elasticsearchRequestOptions); } catch (final IOException e) { throw new RuntimeException(e); } @@ -172,9 +174,9 @@ public Map get(final String index, final String type, final Stri } @Override - public SearchResponse search(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public SearchResponse search(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { try { - common(throwErrorInSearch, requestParameters, requestHeaders); + common(throwErrorInSearch, elasticsearchRequestOptions); } catch (final IOException e) { throw new RuntimeException(e); } @@ -195,19 +197,19 @@ public SearchResponse search(final String query, final String index, final Strin } @Override - public SearchResponse scroll(final String scroll, final Map requestHeaders) { + public SearchResponse scroll(final String scroll, final ElasticsearchRequestOptions elasticsearchRequestOptions) { if (throwErrorInSearch) { throw new RuntimeException(new IOException("Simulated IOException - scroll")); } scrolling = true; - final SearchResponse response = search(null, null, null, requestParameters, requestHeaders); + final SearchResponse response = search(null, null, null, elasticsearchRequestOptions); scrolling = false; return response; } @Override - public String initialisePointInTime(final String index, final String keepAlive, final Map requestHeaders) { + public String initialisePointInTime(final String index, final String keepAlive, final ElasticsearchRequestOptions elasticsearchRequestOptions) { if (throwErrorInPit) { throw new RuntimeException(new IOException("Simulated IOException - initialisePointInTime")); } @@ -218,7 +220,7 @@ public String initialisePointInTime(final String index, final String keepAlive, } @Override - public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { + public DeleteOperationResponse deletePointInTime(final String pitId, final ElasticsearchRequestOptions elasticsearchRequestOptions) { if (throwErrorInDelete) { throw new RuntimeException(new IOException("Simulated IOException - deletePointInTime")); } @@ -227,7 +229,7 @@ public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { + public DeleteOperationResponse deleteScroll(final String scrollId, final ElasticsearchRequestOptions elasticsearchRequestOptions) { if (throwErrorInDelete) { throw new RuntimeException(new IOException("Simulated IOException - deleteScroll")); } @@ -272,12 +274,8 @@ public void setMaxPages(final int maxPages) { this.maxPages = maxPages; } - public Map getRequestHeaders() { - return this.requestHeaders; - } - - public Map getRequestParameters() { - return this.requestParameters; + public ElasticsearchRequestOptions getElasticsearchRequestOptions() { + return this.elasticsearchRequestOptions; } public String getQuery() { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java index 43075c23097e..0c99bb3be182 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java @@ -64,7 +64,7 @@ void before() throws Exception { runner.setProperty(ElasticsearchRestProcessor.TYPE, type); } - service.refresh(null, null, null); + service.refresh(null, null); } @AfterEach diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java index 713863e1439a..63a51d80d124 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java @@ -21,6 +21,7 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.elasticsearch.DeleteOperationResponse; import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.IndexOperationRequest; import org.apache.nifi.elasticsearch.IndexOperationResponse; import org.apache.nifi.elasticsearch.SearchResponse; @@ -38,82 +39,82 @@ public List verify(final ConfigurationContext context, } @Override - public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters, final Map requestHeaders) { + public IndexOperationResponse add(final IndexOperationRequest operation, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public IndexOperationResponse bulk(final List operations, final Map requestParameters, final Map requestHeaders) { + public IndexOperationResponse bulk(final List operations, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public Long count(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public Long count(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + public DeleteOperationResponse deleteById(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters, final Map requestHeaders) { + public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public void refresh(final String index, final Map requestParameters, final Map requestHeaders) { + public void refresh(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { // intentionally blank } @Override - public boolean exists(final String index, final Map requestParameters, final Map requestHeaders) { + public boolean exists(final String index, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return true; } @Override - public boolean documentExists(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + public boolean documentExists(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return true; } @Override - public Map get(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + public Map get(final String index, final String type, final String id, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public SearchResponse search(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + public SearchResponse search(final String query, final String index, final String type, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public SearchResponse scroll(final String scroll, final Map requestHeaders) { + public SearchResponse scroll(final String scroll, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public String initialisePointInTime(final String index, final String keepAlive, final Map requestHeaders) { + public String initialisePointInTime(final String index, final String keepAlive, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { + public DeleteOperationResponse deletePointInTime(final String pitId, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } @Override - public DeleteOperationResponse deleteScroll(final String scrollId, final Map requestHeaders) { + public DeleteOperationResponse deleteScroll(final String scrollId, final ElasticsearchRequestOptions elasticsearchRequestOptions) { return null; } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java index 70ffb326464e..f83196214165 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch.mock; +import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions; import org.apache.nifi.elasticsearch.IndexOperationRequest; import org.apache.nifi.elasticsearch.IndexOperationResponse; @@ -30,7 +31,7 @@ public class MockBulkLoadClientService extends AbstractMockElasticsearchClient { private Consumer> evalHeadersConsumer; @Override - public IndexOperationResponse bulk(final List items, final Map requestParameters, final Map requestHeaders) { + public IndexOperationResponse bulk(final List items, final ElasticsearchRequestOptions elasticsearchRequestOptions) { if (isThrowRetryableError()) { throw new MockElasticsearchException(true, false); } else if (isThrowFatalError()) { @@ -42,11 +43,11 @@ public IndexOperationResponse bulk(final List items, fina } if (evalParametersConsumer != null) { - evalParametersConsumer.accept(requestParameters); + evalParametersConsumer.accept(elasticsearchRequestOptions.getRequestParameters()); } if (evalHeadersConsumer != null) { - evalHeadersConsumer.accept(requestHeaders); + evalHeadersConsumer.accept(elasticsearchRequestOptions.getRequestHeaders()); } return response;