Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #57 from o19s/default-store
Browse files Browse the repository at this point in the history
Using a default store for queries
  • Loading branch information
jzonthemtn authored Feb 21, 2024
2 parents efe2123 + 597529d commit 2f7ba3c
Showing 1 changed file with 38 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.core.action.ActionResponse;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Header;
import org.opensearch.ubl.HeaderConstants;
import org.opensearch.ubl.backends.Backend;
import org.opensearch.ubl.model.QueryRequest;
Expand Down Expand Up @@ -63,60 +64,62 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
@Override
public void onResponse(Response response) {

LOGGER.info("Query ID header: " + task.getHeader("query-id"));
//LOGGER.info("Query ID header: " + task.getHeader("query-id"));

final long startTime = System.currentTimeMillis();

final String eventStore = task.getHeader(HeaderConstants.EVENT_STORE_HEADER);
String eventStore = task.getHeader(HeaderConstants.EVENT_STORE_HEADER);

// If there is no event store header we should not continue anything.
if(eventStore != null && !eventStore.trim().isEmpty()) {

// Get the search itself.
final SearchRequest searchRequest = (SearchRequest) request;
// If there is no event store header we'll use a "default" store.
if(eventStore == null || eventStore.trim().isEmpty()) {
eventStore = "default";
}

// TODO: Restrict logging to only queries of certain indices specified in the settings.
//final List<String> indices = Arrays.asList(searchRequest.indices());
//final Set<String> indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(",")));
//if(indicesToLog.containsAll(indices)) {
LOGGER.info("Using UBL event store: {}", eventStore);

// Get all search hits from the response.
if (response instanceof SearchResponse) {
// Get the search itself.
final SearchRequest searchRequest = (SearchRequest) request;

// Create a UUID for this search request.
final String queryId = UUID.randomUUID().toString();
// TODO: Restrict logging to only queries of certain indices specified in the settings.
//final List<String> indices = Arrays.asList(searchRequest.indices());
//final Set<String> indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(",")));
//if(indicesToLog.containsAll(indices)) {

// The query will be empty when there is no query, e.g. /_search
final String query = searchRequest.source().toString();
// Get all search hits from the response.
if (response instanceof SearchResponse) {

// Create a UUID for this search response.
final String queryResponseId = UUID.randomUUID().toString();
// Create a UUID for this search request.
final String queryId = UUID.randomUUID().toString();

final List<String> queryResponseHitIds = new LinkedList<>();
// The query will be empty when there is no query, e.g. /_search
final String query = searchRequest.source().toString();

final SearchResponse searchResponse = (SearchResponse) response;
// Create a UUID for this search response.
final String queryResponseId = UUID.randomUUID().toString();

// Add each hit to the list of query responses.
searchResponse.getHits().forEach(hit -> {
queryResponseHitIds.add(String.valueOf(hit.docId()));
});
final List<String> queryResponseHitIds = new LinkedList<>();

try {
final SearchResponse searchResponse = (SearchResponse) response;

// Persist the query to the backend.
backend.persistQuery(eventStore,
new QueryRequest(queryId, query),
new QueryResponse(queryId, queryResponseId, queryResponseHitIds));
// Add each hit to the list of query responses.
searchResponse.getHits().forEach(hit -> {
queryResponseHitIds.add(String.valueOf(hit.docId()));
});

} catch (Exception ex) {
// TODO: Handle this.
LOGGER.error("Unable to persist query.", ex);
}
try {

threadPool.getThreadContext().addResponseHeader("query_id", queryId);
// Persist the query to the backend.
backend.persistQuery(eventStore,
new QueryRequest(queryId, query),
new QueryResponse(queryId, queryResponseId, queryResponseHitIds));

} catch (Exception ex) {
// TODO: Handle this.
LOGGER.error("Unable to persist query.", ex);
}

threadPool.getThreadContext().addResponseHeader("query_id", queryId);

//}

final long elapsedTime = System.currentTimeMillis() - startTime;
Expand Down

0 comments on commit 2f7ba3c

Please sign in to comment.