From 597529d745497def730d4a239a5f38a338db4970 Mon Sep 17 00:00:00 2001 From: jzonthemtn Date: Wed, 21 Feb 2024 14:12:23 -0500 Subject: [PATCH] #56 Using a default store for queries. --- .../UserBehaviorLoggingActionFilter.java | 73 ++++++++++--------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingActionFilter.java b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingActionFilter.java index eac44cd..b56c65b 100644 --- a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingActionFilter.java +++ b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingActionFilter.java @@ -20,6 +20,7 @@ import org.opensearch.core.action.ActionResponse; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Header; import org.opensearch.ubl.HeaderConstants; import org.opensearch.ubl.backends.Backend; import org.opensearch.ubl.model.QueryRequest; @@ -63,60 +64,62 @@ public void app @Override public void onResponse(Response response) { - LOGGER.info("Query ID header: " + task.getHeader("query-id")); + //LOGGER.info("Query ID header: " + task.getHeader("query-id")); final long startTime = System.currentTimeMillis(); - final String eventStore = task.getHeader(HeaderConstants.EVENT_STORE_HEADER); + String eventStore = task.getHeader(HeaderConstants.EVENT_STORE_HEADER); - // If there is no event store header we should not continue anything. - if(eventStore != null && !eventStore.trim().isEmpty()) { - - // Get the search itself. - final SearchRequest searchRequest = (SearchRequest) request; + // If there is no event store header we'll use a "default" store. + if(eventStore == null || eventStore.trim().isEmpty()) { + eventStore = "default"; + } - // TODO: Restrict logging to only queries of certain indices specified in the settings. - //final List indices = Arrays.asList(searchRequest.indices()); - //final Set indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(","))); - //if(indicesToLog.containsAll(indices)) { + LOGGER.info("Using UBL event store: {}", eventStore); - // Get all search hits from the response. - if (response instanceof SearchResponse) { + // Get the search itself. + final SearchRequest searchRequest = (SearchRequest) request; - // Create a UUID for this search request. - final String queryId = UUID.randomUUID().toString(); + // TODO: Restrict logging to only queries of certain indices specified in the settings. + //final List indices = Arrays.asList(searchRequest.indices()); + //final Set indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(","))); + //if(indicesToLog.containsAll(indices)) { - // The query will be empty when there is no query, e.g. /_search - final String query = searchRequest.source().toString(); + // Get all search hits from the response. + if (response instanceof SearchResponse) { - // Create a UUID for this search response. - final String queryResponseId = UUID.randomUUID().toString(); + // Create a UUID for this search request. + final String queryId = UUID.randomUUID().toString(); - final List queryResponseHitIds = new LinkedList<>(); + // The query will be empty when there is no query, e.g. /_search + final String query = searchRequest.source().toString(); - final SearchResponse searchResponse = (SearchResponse) response; + // Create a UUID for this search response. + final String queryResponseId = UUID.randomUUID().toString(); - // Add each hit to the list of query responses. - searchResponse.getHits().forEach(hit -> { - queryResponseHitIds.add(String.valueOf(hit.docId())); - }); + final List queryResponseHitIds = new LinkedList<>(); - try { + final SearchResponse searchResponse = (SearchResponse) response; - // Persist the query to the backend. - backend.persistQuery(eventStore, - new QueryRequest(queryId, query), - new QueryResponse(queryId, queryResponseId, queryResponseHitIds)); + // Add each hit to the list of query responses. + searchResponse.getHits().forEach(hit -> { + queryResponseHitIds.add(String.valueOf(hit.docId())); + }); - } catch (Exception ex) { - // TODO: Handle this. - LOGGER.error("Unable to persist query.", ex); - } + try { - threadPool.getThreadContext().addResponseHeader("query_id", queryId); + // Persist the query to the backend. + backend.persistQuery(eventStore, + new QueryRequest(queryId, query), + new QueryResponse(queryId, queryResponseId, queryResponseHitIds)); + } catch (Exception ex) { + // TODO: Handle this. + LOGGER.error("Unable to persist query.", ex); } + threadPool.getThreadContext().addResponseHeader("query_id", queryId); + //} final long elapsedTime = System.currentTimeMillis() - startTime;