Skip to content

Commit

Permalink
Rename PipelineRequestContext -> PipelineProcessingContext
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Nov 16, 2023
1 parent 8e561b6 commit bef6d68
Show file tree
Hide file tree
Showing 16 changed files with 45 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchService;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.PipelinedRequestContext;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.StatefulSearchRequestProcessor;
Expand Down Expand Up @@ -44,7 +44,7 @@ private OversampleRequestProcessor(String tag, String description, boolean ignor
}

@Override
public SearchRequest processRequest(SearchRequest request, PipelinedRequestContext requestContext) {
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) {
if (request.source() != null) {
int originalSize = request.source().size();
if (originalSize == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.opensearch.script.ScriptType;
import org.opensearch.script.SearchScript;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.PipelinedRequestContext;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.StatefulSearchRequestProcessor;
Expand Down Expand Up @@ -76,7 +76,7 @@ public final class ScriptRequestProcessor extends AbstractProcessor implements S
}

@Override
public SearchRequest processRequest(SearchRequest request, PipelinedRequestContext requestContext) throws Exception {
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
// assert request is not null and source is not null
if (request == null || request.source() == null) {
throw new IllegalArgumentException("search request must not be null");
Expand All @@ -94,9 +94,9 @@ public SearchRequest processRequest(SearchRequest request, PipelinedRequestConte
}

private static class RequestContextMap extends BasicMap {
private final PipelinedRequestContext pipelinedRequestContext;
private final PipelineProcessingContext pipelinedRequestContext;

private RequestContextMap(PipelinedRequestContext pipelinedRequestContext) {
private RequestContextMap(PipelineProcessingContext pipelinedRequestContext) {
this.pipelinedRequestContext = pipelinedRequestContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.PipelinedRequestContext;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.pipeline.StatefulSearchResponseProcessor;
Expand Down Expand Up @@ -49,7 +49,7 @@ private TruncateHitsResponseProcessor(String tag, String description, boolean ig
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelinedRequestContext requestContext) {
public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext) {
int size;
if (targetSize < 0) { // No value specified in processor config. Use context value instead.
String key = applyContextPrefix(contextPrefix, OversampleRequestProcessor.ORIGINAL_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.action.search.SearchRequest;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.PipelinedRequestContext;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.common.helpers.ContextUtils;
import org.opensearch.test.OpenSearchTestCase;

Expand All @@ -26,7 +26,7 @@ public void testEmptySource() {
OversampleRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, false, config, null);

SearchRequest request = new SearchRequest();
PipelinedRequestContext context = new PipelinedRequestContext();
PipelineProcessingContext context = new PipelineProcessingContext();
SearchRequest transformedRequest = processor.processRequest(request, context);
assertEquals(request, transformedRequest);
assertNull(context.getAttribute("original_size"));
Expand All @@ -39,7 +39,7 @@ public void testBasicBehavior() {

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(10);
SearchRequest request = new SearchRequest().source(sourceBuilder);
PipelinedRequestContext context = new PipelinedRequestContext();
PipelineProcessingContext context = new PipelineProcessingContext();
SearchRequest transformedRequest = processor.processRequest(request, context);
assertEquals(30, transformedRequest.source().size());
assertEquals(10, context.getAttribute("original_size"));
Expand All @@ -54,7 +54,7 @@ public void testContextPrefix() {

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(10);
SearchRequest request = new SearchRequest().source(sourceBuilder);
PipelinedRequestContext context = new PipelinedRequestContext();
PipelineProcessingContext context = new PipelineProcessingContext();
SearchRequest transformedRequest = processor.processRequest(request, context);
assertEquals(30, transformedRequest.source().size());
assertEquals(10, context.getAttribute("foo.original_size"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.opensearch.script.ScriptType;
import org.opensearch.script.SearchScript;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.PipelinedRequestContext;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
searchRequest.source(createSearchSourceBuilder());

assertNotNull(searchRequest);
processor.processRequest(searchRequest, new PipelinedRequestContext());
processor.processRequest(searchRequest, new PipelineProcessingContext());
assertSearchRequest(searchRequest);
}

Expand All @@ -103,7 +103,7 @@ public void testScriptingWithPrecompiledIngestScript() throws Exception {
searchRequest.source(createSearchSourceBuilder());

assertNotNull(searchRequest);
processor.processRequest(searchRequest, new PipelinedRequestContext());
processor.processRequest(searchRequest, new PipelineProcessingContext());
assertSearchRequest(searchRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.pipeline.PipelinedRequestContext;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.common.helpers.ContextUtils;
import org.opensearch.test.OpenSearchTestCase;

Expand All @@ -32,7 +32,7 @@ public void testBasicBehavior() {

int numHits = randomInt(100);
SearchResponse response = constructResponse(numHits);
SearchResponse transformedResponse = processor.processResponse(new SearchRequest(), response, new PipelinedRequestContext());
SearchResponse transformedResponse = processor.processResponse(new SearchRequest(), response, new PipelineProcessingContext());
assertEquals(Math.min(targetSize, numHits), transformedResponse.getHits().getHits().length);
}

Expand All @@ -43,7 +43,7 @@ public void testTargetSizePassedViaContext() {
int targetSize = randomInt(50);
int numHits = randomInt(100);
SearchResponse response = constructResponse(numHits);
PipelinedRequestContext requestContext = new PipelinedRequestContext();
PipelineProcessingContext requestContext = new PipelineProcessingContext();
requestContext.setAttribute("original_size", targetSize);
SearchResponse transformedResponse = processor.processResponse(new SearchRequest(), response, requestContext);
assertEquals(Math.min(targetSize, numHits), transformedResponse.getHits().getHits().length);
Expand All @@ -57,7 +57,7 @@ public void testTargetSizePassedViaContextWithPrefix() {
int targetSize = randomInt(50);
int numHits = randomInt(100);
SearchResponse response = constructResponse(numHits);
PipelinedRequestContext requestContext = new PipelinedRequestContext();
PipelineProcessingContext requestContext = new PipelineProcessingContext();
requestContext.setAttribute("foo.original_size", targetSize);
SearchResponse transformedResponse = processor.processResponse(new SearchRequest(), response, requestContext);
assertEquals(Math.min(targetSize, numHits), transformedResponse.getHits().getHits().length);
Expand All @@ -71,7 +71,7 @@ public void testTargetSizeMissing() {
SearchResponse response = constructResponse(numHits);
assertThrows(
IllegalStateException.class,
() -> processor.processResponse(new SearchRequest(), response, new PipelinedRequestContext())
() -> processor.processResponse(new SearchRequest(), response, new PipelineProcessingContext())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected void afterResponseProcessor(Processor processor, long timeInNanos) {}

protected void onResponseProcessorFailed(Processor processor) {}

void transformRequest(SearchRequest request, ActionListener<SearchRequest> requestListener, PipelinedRequestContext requestContext)
void transformRequest(SearchRequest request, ActionListener<SearchRequest> requestListener, PipelineProcessingContext requestContext)
throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty()) {
requestListener.onResponse(request);
Expand Down Expand Up @@ -179,7 +179,7 @@ void transformRequest(SearchRequest request, ActionListener<SearchRequest> reque

private ActionListener<SearchRequest> getTerminalSearchRequestActionListener(
ActionListener<SearchRequest> requestListener,
PipelinedRequestContext requestContext
PipelineProcessingContext requestContext
) {
final long pipelineStart = relativeTimeSupplier.getAsLong();

Expand All @@ -198,7 +198,7 @@ private ActionListener<SearchRequest> getTerminalSearchRequestActionListener(
ActionListener<SearchResponse> transformResponseListener(
SearchRequest request,
ActionListener<SearchResponse> responseListener,
PipelinedRequestContext requestContext
PipelineProcessingContext requestContext
) {
if (searchResponseProcessors.isEmpty()) {
// No response transformation necessary
Expand Down Expand Up @@ -266,7 +266,7 @@ <Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
SearchPhaseContext context,
String currentPhase,
String nextPhase,
PipelinedRequestContext requestContext
PipelineProcessingContext requestContext
) throws SearchPipelineProcessingException {
try {
for (SearchPhaseResultsProcessor searchPhaseResultsProcessor : searchPhaseResultsProcessors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
/**
* A holder for state that is passed through each processor in the pipeline.
*/
public class PipelinedRequestContext {
public class PipelineProcessingContext {
private final Map<String, Object> attributes = new HashMap<>();

/**
* Set a generic attribute in the state for this request. Overwrites any existing value.
*
* @param name the name of the attribute to set
* @param value the value to set on the attribute
* @param value the value to set on the attributen
*/
public void setAttribute(String name, Object value) {
attributes.put(name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
*/
public final class PipelinedRequest extends SearchRequest {
private final Pipeline pipeline;
private final PipelinedRequestContext requestContext;
private final PipelineProcessingContext requestContext;

PipelinedRequest(Pipeline pipeline, SearchRequest transformedRequest, PipelinedRequestContext requestContext) {
PipelinedRequest(Pipeline pipeline, SearchRequest transformedRequest, PipelineProcessingContext requestContext) {
super(transformedRequest);
this.pipeline = pipeline;
this.requestContext = requestContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ <Result extends SearchPhaseResult> void process(

/**
* Processes the {@link SearchPhaseResults} obtained from a SearchPhase which will be returned to next
* SearchPhase. Receives the {@link PipelinedRequestContext} passed to other processors.
* SearchPhase. Receives the {@link PipelineProcessingContext} passed to other processors.
* @param searchPhaseResult {@link SearchPhaseResults}
* @param searchPhaseContext {@link SearchContext}
* @param requestContext {@link PipelinedRequestContext}
* @param requestContext {@link PipelineProcessingContext}
* @param <Result> {@link SearchPhaseResult}
*/
default <Result extends SearchPhaseResult> void process(
final SearchPhaseResults<Result> searchPhaseResult,
final SearchPhaseContext searchPhaseContext,
final PipelinedRequestContext requestContext
final PipelineProcessingContext requestContext
) {
process(searchPhaseResult, searchPhaseContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
pipeline = pipelineHolder.pipeline;
}
}
PipelinedRequestContext requestContext = new PipelinedRequestContext();
PipelineProcessingContext requestContext = new PipelineProcessingContext();
return new PipelinedRequest(pipeline, searchRequest, requestContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface SearchRequestProcessor extends Processor {
* @return the modified search request
* @throws Exception implementation-specific processing exception
*/
default SearchRequest processRequest(SearchRequest request, PipelinedRequestContext requestContext) throws Exception {
default SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
return processRequest(request);
}

Expand All @@ -46,7 +46,7 @@ default SearchRequest processRequest(SearchRequest request, PipelinedRequestCont
*/
default void processRequestAsync(
SearchRequest request,
PipelinedRequestContext requestContext,
PipelineProcessingContext requestContext,
ActionListener<SearchRequest> requestListener
) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface SearchResponseProcessor extends Processor {
* @return the modified search response
* @throws Exception implementation-specific processing exception
*/
default SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelinedRequestContext requestContext)
default SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
throws Exception {
return processResponse(request, response);
}
Expand All @@ -57,7 +57,7 @@ default SearchResponse processResponse(SearchRequest request, SearchResponse res
default void processResponseAsync(
SearchRequest request,
SearchResponse response,
PipelinedRequestContext requestContext,
PipelineProcessingContext requestContext,
ActionListener<SearchResponse> responseListener
) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ default SearchRequest processRequest(SearchRequest request) {
}

Check warning on line 21 in server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java#L20-L21

Added lines #L20 - L21 were not covered by tests

@Override
SearchRequest processRequest(SearchRequest request, PipelinedRequestContext requestContext) throws Exception;
SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ default SearchResponse processResponse(SearchRequest request, SearchResponse res
}

@Override
SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelinedRequestContext requestContext) throws Exception;
SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1383,9 +1383,9 @@ public void testExtraParameterInProcessorConfig() {

private static class FakeStatefulRequestProcessor extends AbstractProcessor implements StatefulSearchRequestProcessor {
private final String type;
private final Consumer<PipelinedRequestContext> stateConsumer;
private final Consumer<PipelineProcessingContext> stateConsumer;

public FakeStatefulRequestProcessor(String type, Consumer<PipelinedRequestContext> stateConsumer) {
public FakeStatefulRequestProcessor(String type, Consumer<PipelineProcessingContext> stateConsumer) {
super(null, null, false);
this.type = type;
this.stateConsumer = stateConsumer;
Expand All @@ -1397,17 +1397,17 @@ public String getType() {
}

@Override
public SearchRequest processRequest(SearchRequest request, PipelinedRequestContext requestContext) throws Exception {
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
stateConsumer.accept(requestContext);
return request;
}
}

private static class FakeStatefulResponseProcessor extends AbstractProcessor implements StatefulSearchResponseProcessor {
private final String type;
private final Consumer<PipelinedRequestContext> stateConsumer;
private final Consumer<PipelineProcessingContext> stateConsumer;

public FakeStatefulResponseProcessor(String type, Consumer<PipelinedRequestContext> stateConsumer) {
public FakeStatefulResponseProcessor(String type, Consumer<PipelineProcessingContext> stateConsumer) {
super(null, null, false);
this.type = type;
this.stateConsumer = stateConsumer;
Expand All @@ -1419,7 +1419,7 @@ public String getType() {
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelinedRequestContext requestContext)
public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
throws Exception {
stateConsumer.accept(requestContext);
return response;
Expand Down

0 comments on commit bef6d68

Please sign in to comment.