Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #60 from o19s/user-session-id
Browse files Browse the repository at this point in the history
Reads user_id and session_id
  • Loading branch information
jzonthemtn authored Feb 22, 2024
2 parents aad40e7 + 2f8e3fe commit a7ae948
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 28 deletions.
22 changes: 20 additions & 2 deletions src/main/java/org/opensearch/ubl/HeaderConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,26 @@

package org.opensearch.ubl;

public class HeaderConstants {
public enum HeaderConstants {

public static final String EVENT_STORE_HEADER = "X-ubi-store";
QUERY_ID_HEADER("X-ubl-query-id"),
EVENT_STORE_HEADER("X-ubl-store"),
USER_ID_HEADER("X-ubl-user-id"),
SESSION_ID_HEADER("X-ubl-session-id");

private final String header;

private HeaderConstants(String header) {
this.header = header;
}

public String getHeader() {
return header;
}

@Override
public String toString() {
return header;
}

}
12 changes: 10 additions & 2 deletions src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,20 @@ public class UserBehaviorLoggingPlugin extends Plugin implements ActionPlugin {

@Override
public Collection<RestHeaderDefinition> getRestHeaders() {
return List.of(new RestHeaderDefinition(HeaderConstants.EVENT_STORE_HEADER, false));
return List.of(
new RestHeaderDefinition(HeaderConstants.EVENT_STORE_HEADER.getHeader(), false),
new RestHeaderDefinition(HeaderConstants.SESSION_ID_HEADER.getHeader(), false),
new RestHeaderDefinition(HeaderConstants.USER_ID_HEADER.getHeader(), false)
);
}

@Override
public Collection<String> getTaskHeaders() {
return List.of(HeaderConstants.EVENT_STORE_HEADER);
return List.of(
HeaderConstants.EVENT_STORE_HEADER.getHeader(),
HeaderConstants.SESSION_ID_HEADER.getHeader(),
HeaderConstants.USER_ID_HEADER.getHeader()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.core.action.ActionResponse;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Header;
import org.opensearch.ubl.HeaderConstants;
import org.opensearch.ubl.backends.Backend;
import org.opensearch.ubl.model.QueryRequest;
Expand Down Expand Up @@ -64,19 +63,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
@Override
public void onResponse(Response response) {

//LOGGER.info("Query ID header: " + task.getHeader("query-id"));

final long startTime = System.currentTimeMillis();

String eventStore = task.getHeader(HeaderConstants.EVENT_STORE_HEADER);

// If there is no event store header we'll use a "default" store.
if(eventStore == null || eventStore.trim().isEmpty()) {
eventStore = "default";
}

LOGGER.info("Using UBL event store: {}", eventStore);

// Get the search itself.
final SearchRequest searchRequest = (SearchRequest) request;

Expand All @@ -88,8 +76,11 @@ public void onResponse(Response response) {
// Get all search hits from the response.
if (response instanceof SearchResponse) {

// Create a UUID for this search request.
final String queryId = UUID.randomUUID().toString();
// Get info from the headers.
final String queryId = getHeaderValue(HeaderConstants.QUERY_ID_HEADER, UUID.randomUUID().toString(), task);
final String eventStore = getHeaderValue(HeaderConstants.EVENT_STORE_HEADER, "default", task);
final String userId = getHeaderValue(HeaderConstants.USER_ID_HEADER, "", task);
final String sessionId = getHeaderValue(HeaderConstants.SESSION_ID_HEADER, "", task);

// The query will be empty when there is no query, e.g. /_search
final String query = searchRequest.source().toString();
Expand All @@ -98,19 +89,16 @@ public void onResponse(Response response) {
final String queryResponseId = UUID.randomUUID().toString();

final List<String> queryResponseHitIds = new LinkedList<>();

final SearchResponse searchResponse = (SearchResponse) response;

// Add each hit to the list of query responses.
searchResponse.getHits().forEach(hit -> {
queryResponseHitIds.add(String.valueOf(hit.docId()));
});
searchResponse.getHits().forEach(hit -> queryResponseHitIds.add(String.valueOf(hit.docId())));

try {

// Persist the query to the backend.
backend.persistQuery(eventStore,
new QueryRequest(queryId, query),
new QueryRequest(queryId, query, userId, sessionId),
new QueryResponse(queryId, queryResponseId, queryResponseHitIds));

} catch (Exception ex) {
Expand Down Expand Up @@ -140,4 +128,16 @@ public void onFailure(Exception ex) {

}

private String getHeaderValue(final HeaderConstants header, final String defaultValue, final Task task) {

final String value = task.getHeader(header.getHeader());

if(value == null || value.trim().isEmpty()) {
return defaultValue;
} else {
return value;
}

}

}
11 changes: 7 additions & 4 deletions src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void delete(String storeName) {
final String eventsIndexName = getEventsIndexName(storeName);
final String queriesIndexName = getQueriesIndexName(storeName);
final DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(eventsIndexName, queriesIndexName);

client.admin().indices().delete(deleteIndexRequest);

}
Expand All @@ -102,17 +103,19 @@ public void persistEvent(String storeName, String eventJson) {
}

@Override
public void persistQuery(final String storeName, final QueryRequest queryRequest, QueryResponse queryResponse) throws Exception {
public void persistQuery(final String storeName, final QueryRequest queryRequest, QueryResponse queryResponse) {

LOGGER.info("Writing query ID {} with response ID {}", queryRequest.getQueryId(), queryResponse.getQueryResponseId());

// What will be indexed - adheres to the queries-mapping.json
final Map<String, Object> source = new HashMap<>();
source.put("timestamp", queryRequest.getTimestamp());
source.put("queryId", queryRequest.getQueryId());
source.put("query_id", queryRequest.getQueryId());
source.put("query", queryRequest.getQuery());
source.put("queryResponseId", queryResponse.getQueryResponseId());
source.put("queryResponseHitIds", queryResponse.getQueryResponseHitIds());
source.put("query_response_id", queryResponse.getQueryResponseId());
source.put("query_response_hit_ids", queryResponse.getQueryResponseHitIds());
source.put("user_id", queryRequest.getUserId());
source.put("session_id", queryRequest.getSessionId());

// Get the name of the queries.
final String queriesIndexName = getQueriesIndexName(storeName);
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/opensearch/ubl/model/QueryRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ public class QueryRequest {
private final long timestamp;
private final String queryId;
private final String query;
private final String userId;
private final String sessionId;

public QueryRequest(final String queryId, final String query) {
public QueryRequest(final String queryId, final String query, final String userId, final String sessionId) {
this.timestamp = System.currentTimeMillis();
this.queryId = queryId;
this.query = query;
this.userId = userId;
this.sessionId = sessionId;
}

public long getTimestamp() {
Expand All @@ -32,4 +36,12 @@ public String getQuery() {
return query;
}

public String getUserId() {
return userId;
}

public String getSessionId() {
return sessionId;
}

}

0 comments on commit a7ae948

Please sign in to comment.