diff --git a/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java b/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java index 239051d78d6..248285118e8 100644 --- a/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java +++ b/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java @@ -139,6 +139,7 @@ protected T execute(HttpRequestBase request, IOFunction resultConverter, @PostConstruct protected void init() { + log.info("Starting up RemoteHttpService " + System.identityHashCode(this)); objectMapper = objectMapperDecorator.decorate(new ObjectMapper()); voidResponseReader = objectMapper.readerFor(VoidResponse.class); @@ -193,11 +194,14 @@ protected void init() { log.error("Unable to retrieve aliases from KeyStore."); throw new IllegalStateException(e); } + log.info("Started up RemoteHttpService " + System.identityHashCode(this)); } @PreDestroy protected void shutdown() { + log.info("Shutting down RemoteHttpService " + System.identityHashCode(this)); executorService.submit(() -> { + log.info("Executing shutdown RemoteHttpService " + System.identityHashCode(RemoteHttpService.this)); long waitStart = System.currentTimeMillis(); long totalWait = 0; while (activeExecutions.get() > 0 && totalWait < 60000L) { @@ -213,7 +217,7 @@ protected void shutdown() { } catch (IOException e) { log.warn("Exception while shutting down HttpClient: " + e.getMessage(), e); } - + log.info("Shutdown RemoteHttpService " + System.identityHashCode(RemoteHttpService.this)); }); } diff --git a/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryConfiguration.java new file mode 100644 index 00000000000..69924d2d3ee --- /dev/null +++ b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryConfiguration.java @@ -0,0 +1,108 @@ +package datawave.webservice.query.logic.composite; + +import datawave.webservice.query.Query; +import datawave.webservice.query.QueryImpl; +import datawave.webservice.query.configuration.GenericQueryConfiguration; + +import java.io.Serializable; + +public class CompositeQueryConfiguration extends GenericQueryConfiguration implements Serializable { + + private Query query = null; + + // Specifies whether all queries must succeed initialization + private boolean allMustInitialize = false; + + // Specifies whether queries are run sequentially. We stop after the first query that returns any results. + private boolean shortCircuitExecution = false; + + public CompositeQueryConfiguration() { + super(); + query = new QueryImpl(); + } + + /** + * Performs a deep copy of the provided CompositeQueryConfiguration into a new instance + * + * @param other + * - another CompositeQueryConfiguration instance + */ + public CompositeQueryConfiguration(CompositeQueryConfiguration other) { + + // GenericQueryConfiguration copy first + super(other); + } + + /** + * Factory method that instantiates an fresh CompositeQueryConfiguration + * + * @return - a clean CompositeQueryConfiguration + */ + public static CompositeQueryConfiguration create() { + return new CompositeQueryConfiguration(); + } + + /** + * Factory method that returns a deep copy of the provided CompositeQueryConfiguration + * + * @param other + * - another instance of a CompositeQueryConfiguration + * @return - copy of provided CompositeQueryConfiguration + */ + public static CompositeQueryConfiguration create(CompositeQueryConfiguration other) { + return new CompositeQueryConfiguration(other); + } + + /** + * Factory method that creates a CompositeQueryConfiguration deep copy from a CompositeQueryLogic + * + * @param compositeQueryLogic + * - a configured CompositeQueryLogic + * @return - a CompositeQueryConfiguration + */ + public static CompositeQueryConfiguration create(CompositeQueryLogic compositeQueryLogic) { + + CompositeQueryConfiguration config = create(compositeQueryLogic.getConfig()); + + return config; + } + + /** + * Factory method that creates a CompositeQueryConfiguration from a CompositeQueryLogic and a Query + * + * @param compositeQueryLogic + * - a configured CompositeQueryLogic + * @param query + * - a configured Query object + * @return - a CompositeQueryConfiguration + */ + public static CompositeQueryConfiguration create(CompositeQueryLogic compositeQueryLogic, Query query) { + CompositeQueryConfiguration config = create(compositeQueryLogic); + config.setQuery(query); + return config; + } + + public Query getQuery() { + return query; + } + + public void setQuery(Query query) { + this.query = query; + } + + public boolean isAllMustInitialize() { + return allMustInitialize; + } + + public void setAllMustInitialize(boolean allMustInitialize) { + this.allMustInitialize = allMustInitialize; + } + + public boolean isShortCircuitExecution() { + return shortCircuitExecution; + } + + public void setShortCircuitExecution(boolean shortCircuitExecution) { + this.shortCircuitExecution = shortCircuitExecution; + } +} diff --git a/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogic.java b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogic.java index 8e2a6edb4db..16e4efc3c52 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogic.java +++ b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogic.java @@ -1,5 +1,27 @@ package datawave.webservice.query.logic.composite; +import com.google.common.base.Joiner; +import datawave.audit.SelectorExtractor; +import datawave.security.authorization.AuthorizationException; +import datawave.security.authorization.DatawavePrincipal; +import datawave.security.util.AuthorizationsUtil; +import datawave.webservice.common.connection.AccumuloConnectionFactory.Priority; +import datawave.security.authorization.UserOperations; +import datawave.webservice.query.Query; +import datawave.webservice.query.cache.ResultsPage; +import datawave.webservice.query.configuration.GenericQueryConfiguration; +import datawave.webservice.query.exception.EmptyObjectException; +import datawave.webservice.query.logic.BaseQueryLogic; +import datawave.webservice.query.logic.QueryLogic; +import datawave.webservice.query.logic.QueryLogicTransformer; +import datawave.webservice.result.BaseResponse; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.collections4.functors.NOPTransformer; +import org.apache.commons.collections4.iterators.TransformIterator; +import org.apache.log4j.Logger; + import java.security.Principal; import java.util.ArrayList; import java.util.Collection; @@ -11,6 +33,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; @@ -37,9 +60,10 @@ import datawave.webservice.result.BaseResponse; /** - * Query Logic implementation that is configured with more than one query logic delegate. The queries are run in parallel and results are retrieved as they come - * back from the delegates. This class restricts the delegates such that they have to return the same type of response object and two query logics with the same - * class name and tableName cannot be configured. + * Query Logic implementation that is configured with more than one query logic delegate. The queries are run in parallel unless configured to be sequential. + * Results are retrieved as they come back from the delegates. This class restricts the delegates such that they have to return the same type of response + * object. If configured to run sequentially, then the execution will terminate after the first query that returns results. Query logics will be sorted by their + * configured name. */ public class CompositeQueryLogic extends BaseQueryLogic { @@ -59,7 +83,11 @@ public QueryLogicHolder(String logicName, QueryLogic logic) { this.setLogic(logic); this.setName(Thread.currentThread().getName() + "-CompositeQueryLogic-" + logicName); } - + + public boolean wasStarted() { + return started; + } + public String getLogicName() { return logicName; } @@ -138,6 +166,9 @@ public void run() { else { throw new RuntimeException(e); } + } catch (EmptyObjectException eoe) { + // Adding an empty object exception to the results queue needs to be passed all the way out. + results.add(eoe); } resultCount++; } @@ -156,35 +187,33 @@ public void run() { protected static final Logger log = Logger.getLogger(CompositeQueryLogic.class); + private CompositeQueryConfiguration config; + private Map> queryLogics = null; - - // Specified whether all queries must succeed initialization - private boolean allMustInitialize = false; - + private QueryLogicTransformer transformer; private Priority p = Priority.NORMAL; private volatile boolean interrupted = false; private volatile CountDownLatch startLatch = null; private volatile CountDownLatch completionLatch = null; private Map logicState = new HashMap<>(); + private Map> failedQueryLogics = new HashMap<>(); private volatile CompositeQueryLogicResults results = null; public CompositeQueryLogic() {} public CompositeQueryLogic(CompositeQueryLogic other) { super(other); - if (other.queryLogics != null) { - this.queryLogics = new HashMap<>(); - for (Map.Entry> entry : other.queryLogics.entrySet()) { - try { - this.queryLogics.put(entry.getKey(), (QueryLogic) entry.getValue().clone()); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } + this.config = CompositeQueryConfiguration.create(other); + this.queryLogics = new TreeMap<>(); + for (Map.Entry> entry : other.getAllQueryLogics().entrySet()) { + try { + this.queryLogics.put(entry.getKey(), (QueryLogic) entry.getValue().clone()); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); } - setPrincipal(other.getPrincipal()); } - this.allMustInitialize = other.allMustInitialize; + setPrincipal(other.getPrincipal()); } public Set updateRuntimeAuthorizationsAndQueryAuths(QueryLogic logic, Query settings) throws AuthorizationException { @@ -216,12 +245,17 @@ public Set updateRuntimeAuthorizationsAndQueryAuths(QueryLogic runtimeQueryAuthorizations) throws Exception { - StringBuilder logicQueryStringBuilder = new StringBuilder("CompositeQueryLogic: "); + + StringBuilder logicQueryStringBuilder = new StringBuilder(); + if (!getInitializedLogics().isEmpty()) { + logicQueryStringBuilder.append(getConfig().getQueryString()); + } else { + logicQueryStringBuilder.append("CompositeQueryLogic: "); + } + Map exceptions = new HashMap<>(); - if (this.queryLogics != null) { - Iterator>> itr = this.queryLogics.entrySet().iterator(); - while (itr.hasNext()) { - Map.Entry> next = itr.next(); + if (!getUninitializedLogics().isEmpty()) { + for (Map.Entry> next : getUninitializedLogics().entrySet()) { String logicName = next.getKey(); QueryLogic logic = next.getValue(); GenericQueryConfiguration config = null; @@ -236,50 +270,79 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting if (logicQueryStringBuilder.length() > 0) { logicQueryStringBuilder.append(" || "); } - logicQueryStringBuilder.append("( ( logic = '").append(logic.getLogicName()).append("' )"); + logicQueryStringBuilder.append("( ( logic = '").append(logicName).append("' )"); logicQueryStringBuilder.append(" && ").append(config.getQueryString()).append(" )"); QueryLogicHolder holder = new QueryLogicHolder(logicName, logic); holder.setConfig(config); holder.setSettings(settingsCopy); holder.setMaxResults(logic.getMaxResults()); logicState.put(logicName, holder); + + // if doing sequential execution, then stop since we have one initialized + if (isShortCircuitExecution()) { + break; + } + } catch (Exception e) { exceptions.put(logicName, e); log.error("Failed to initialize " + logic.getClass().getName(), e); - itr.remove(); + failedQueryLogics.put(logicName, logic); + } finally { + queryLogics.remove(next.getKey()); } } - } - - if (!exceptions.isEmpty()) { - if (logicState.isEmpty()) { - // all logics have failed to initialize, rethrow the last exception caught - throw new CompositeLogicException("All logics have failed to initialize", exceptions); + + // if something failed initialization + if (!exceptions.isEmpty()) { + if (logicState.isEmpty()) { + // all logics have failed to initialize, rethrow the last exception caught + throw new CompositeLogicException("All logics have failed to initialize", exceptions); + } + + // if all must initialize successfully, then pass up an exception + if (isAllMustInitialize()) { + throw new CompositeLogicException("Failed to initialize all composite child logics", exceptions); + } } - - // if all must initialize successfully, then pass up an exception - if (allMustInitialize) { - throw new CompositeLogicException("Failed to initialize all composite child logics", exceptions); + + // if results is already set, then we were merely adding a new query logic to the mix + if (this.results == null) { + this.results = new CompositeQueryLogicResults(this, Math.min(settings.getPagesize() * 2, 1000)); } - } - - startLatch = new CountDownLatch(logicState.size()); - completionLatch = new CountDownLatch(logicState.size()); - this.results = new CompositeQueryLogicResults(Math.min(settings.getPagesize() * 2, 1000), completionLatch); - if (log.isDebugEnabled()) { - log.debug("CompositeQuery initialized with the following queryLogics: "); - for (Entry entry : this.logicState.entrySet()) { - log.debug("\tLogicName: " + entry.getKey() + ", tableName: " + entry.getValue().getLogic().getTableName()); + + if (log.isDebugEnabled()) { + log.debug("CompositeQuery initialized with the following queryLogics: "); + for (Entry> entry : getInitializedLogics().entrySet()) { + log.debug("\nLogicName: " + entry.getKey() + ", tableName: " + entry.getValue().getTableName()); + } + if (isShortCircuitExecution()) { + for (Entry> entry : getUninitializedLogics().entrySet()) { + log.debug("\npending LogicName: " + entry.getKey() + ", tableName: " + entry.getValue().getTableName()); + } + } } + + final String compositeQueryString = logicQueryStringBuilder.toString(); + CompositeQueryConfiguration config = getConfig(); + config.setQueryString(compositeQueryString); + config.setClient(client); + config.setQuery(settings); + config.setAuthorizations(runtimeQueryAuthorizations); + } + return getConfig(); + } + + @Override + public CompositeQueryConfiguration getConfig() { + if (config == null) { + config = CompositeQueryConfiguration.create(); } - - final String compositeQueryString = logicQueryStringBuilder.toString(); - return new GenericQueryConfiguration() { - @Override - public String getQueryString() { - return compositeQueryString; - } - }; + + return config; + } + + public void setConfig(CompositeQueryConfiguration config) { + this.config = config; } @Override @@ -289,7 +352,7 @@ public String getPlan(AccumuloClient client, Query settings, Set StringBuilder plans = new StringBuilder(); int count = 1; String separator = Integer.toString(count++) + ": "; - for (Map.Entry> entry : queryLogics.entrySet()) { + for (Map.Entry> entry : getQueryLogics().entrySet()) { // duplicate the settings for this query Query settingsCopy = settings.duplicate(settings.getQueryName() + " -> " + entry.getKey()); @@ -305,14 +368,26 @@ public String getPlan(AccumuloClient client, Query settings, Set @Override public void setupQuery(GenericQueryConfiguration configuration) throws Exception { + int count = 0; + for (QueryLogicHolder holder : logicState.values()) { - holder.getLogic().setupQuery(holder.getConfig()); - TransformIterator transformIterator = holder.getLogic().getTransformIterator(holder.getSettings()); - holder.setTransformIterator(transformIterator); + if (!holder.wasStarted()) { + holder.getLogic().setupQuery(holder.getConfig()); + TransformIterator transformIterator = holder.getLogic().getTransformIterator(holder.getSettings()); + holder.setTransformIterator(transformIterator); + count++; + } } + + startLatch = new CountDownLatch(count); + completionLatch = new CountDownLatch(count); + for (QueryLogicHolder holder : logicState.values()) { - holder.start(); + if (!holder.wasStarted()) { + holder.start(); + } } + // Wait until all threads have started startLatch.await(); log.trace("All threads have started."); @@ -335,17 +410,15 @@ public synchronized QueryLogicTransformer getTransformer(Query settings) { ResultsPage emptyList = new ResultsPage(); Class responseClass = null; List delegates = new ArrayList<>(); - if (this.queryLogics != null) { - for (QueryLogic logic : this.queryLogics.values()) { - QueryLogicTransformer t = logic.getTransformer(settings); - delegates.add(t); - BaseResponse refResponse = t.createResponse(emptyList); - if (null == responseClass) { - responseClass = refResponse.getClass(); - } else { - if (!responseClass.equals(refResponse.getClass())) { - throw new RuntimeException("All query logics must use transformers that return the same object type"); - } + for (QueryLogic logic : getQueryLogics().values()) { + QueryLogicTransformer t = logic.getTransformer(settings); + delegates.add(t); + BaseResponse refResponse = t.createResponse(emptyList); + if (null == responseClass) { + responseClass = refResponse.getClass(); + } else { + if (!responseClass.equals(refResponse.getClass())) { + throw new RuntimeException("All query logics must use transformers that return the same object type"); } } } @@ -391,11 +464,46 @@ public void close() { } public Map> getQueryLogics() { - return this.queryLogics; + TreeMap> logics = new TreeMap<>(); + logics.putAll(getUninitializedLogics()); + logics.putAll(getInitializedLogics()); + return logics; + } + + public Map> getAllQueryLogics() { + TreeMap> logics = new TreeMap<>(); + logics.putAll(getUninitializedLogics()); + logics.putAll(getInitializedLogics()); + logics.putAll(getFailedLogics()); + return logics; + } + + public Map> getFailedLogics() { + if (failedQueryLogics != null) { + return failedQueryLogics; + } else { + return new HashMap<>(); + } + } + + public Map> getUninitializedLogics() { + if (queryLogics != null) { + return new TreeMap<>(queryLogics); + } else { + return new TreeMap<>(); + } + } + + public Map> getInitializedLogics() { + TreeMap> logics = new TreeMap<>(); + if (logicState != null) { + logicState.entrySet().forEach(e -> logics.put(e.getKey(), e.getValue().getLogic())); + } + return logics; } public void setQueryLogics(Map> queryLogics) { - this.queryLogics = queryLogics; + this.queryLogics = new TreeMap<>(queryLogics); } public UserOperations getUserOperations() { @@ -404,7 +512,7 @@ public UserOperations getUserOperations() { // query logics boolean includeLocal = false; List userOperations = new ArrayList<>(); - for (QueryLogic logic : this.queryLogics.values()) { + for (QueryLogic logic : getQueryLogics().values()) { UserOperations ops = logic.getUserOperations(); if (ops == null) { includeLocal = true; @@ -420,29 +528,21 @@ public UserOperations getUserOperations() { @Override public boolean canRunQuery(Principal principal) { - if (this.queryLogics == null) { - return false; - } - // user can run this composite query if they can run at least one of the configured query logics - Iterator> itr = this.queryLogics.values().iterator(); - while (itr.hasNext()) { - QueryLogic logic = itr.next(); - if (!logic.canRunQuery(principal)) { - itr.remove(); + for (Map.Entry> entry : getUninitializedLogics().entrySet()) { + if (!entry.getValue().canRunQuery(principal)) { + queryLogics.remove(entry.getKey()); } } - return (!this.queryLogics.isEmpty()); + return (!getUninitializedLogics().isEmpty()); } @Override public Set getOptionalQueryParameters() { Set params = new TreeSet<>(); // Create a UNION set. Should it be an intersection? - if (this.queryLogics != null) { - for (QueryLogic l : this.queryLogics.values()) { - params.addAll(l.getOptionalQueryParameters()); - } + for (QueryLogic l : getQueryLogics().values()) { + params.addAll(l.getOptionalQueryParameters()); } return params; } @@ -450,10 +550,8 @@ public Set getOptionalQueryParameters() { @Override public Set getRequiredQueryParameters() { Set params = new TreeSet<>(); - if (this.queryLogics != null) { - for (QueryLogic l : this.queryLogics.values()) { - params.addAll(l.getRequiredQueryParameters()); - } + for (QueryLogic l : getQueryLogics().values()) { + params.addAll(l.getRequiredQueryParameters()); } return params; } @@ -461,12 +559,10 @@ public Set getRequiredQueryParameters() { @Override public Set getExampleQueries() { Set params = new TreeSet<>(); - if (this.queryLogics != null) { - for (QueryLogic l : this.queryLogics.values()) { - Set examples = l.getExampleQueries(); - if (examples != null) { - params.addAll(examples); - } + for (QueryLogic l : getQueryLogics().values()) { + Set examples = l.getExampleQueries(); + if (examples != null) { + params.addAll(examples); } } return params.isEmpty() ? null : params; @@ -480,11 +576,9 @@ public Set getExampleQueries() { @Override public boolean canRunQuery() { if (super.canRunQuery()) { - if (this.queryLogics != null) { - for (QueryLogic logic : this.queryLogics.values()) { - if (logic.canRunQuery()) { - return true; - } + for (QueryLogic logic : getQueryLogics().values()) { + if (logic.canRunQuery()) { + return true; } } } @@ -498,12 +592,10 @@ public boolean canRunQuery() { */ @Override public SelectorExtractor getSelectorExtractor() { - if (this.queryLogics != null) { - for (QueryLogic logic : this.queryLogics.values()) { - SelectorExtractor extractor = logic.getSelectorExtractor(); - if (extractor != null) { - return extractor; - } + for (QueryLogic logic : getQueryLogics().values()) { + SelectorExtractor extractor = logic.getSelectorExtractor(); + if (extractor != null) { + return extractor; } } return null; @@ -518,10 +610,8 @@ public SelectorExtractor getSelectorExtractor() { @Override public void setPrincipal(Principal principal) { super.setPrincipal(principal); - if (this.queryLogics != null) { - for (QueryLogic logic : this.queryLogics.values()) { - logic.setPrincipal(principal); - } + for (QueryLogic logic : getQueryLogics().values()) { + logic.setPrincipal(principal); } } @@ -534,18 +624,40 @@ public void setPrincipal(Principal principal) { @Override public void setPageProcessingStartTime(long pageProcessingStartTime) { super.setPageProcessingStartTime(pageProcessingStartTime); - if (this.queryLogics != null) { - for (QueryLogic logic : this.queryLogics.values()) { - logic.setPageProcessingStartTime(pageProcessingStartTime); - } + for (QueryLogic logic : getQueryLogics().values()) { + logic.setPageProcessingStartTime(pageProcessingStartTime); } } public boolean isAllMustInitialize() { - return allMustInitialize; + return getConfig().isAllMustInitialize(); } public void setAllMustInitialize(boolean allMustInitialize) { - this.allMustInitialize = allMustInitialize; + getConfig().setAllMustInitialize(allMustInitialize); + } + + public boolean isShortCircuitExecution() { + return getConfig().isShortCircuitExecution(); + } + + public void setShortCircuitExecution(boolean shortCircuit) { + getConfig().setShortCircuitExecution(shortCircuit); + } + + public Query getSettings() { + return getConfig().getQuery(); + } + + public void setSettings(Query settings) { + getConfig().setQuery(settings); + } + + public CountDownLatch getStartLatch() { + return startLatch; + } + + public CountDownLatch getCompletionLatch() { + return completionLatch; } } diff --git a/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResults.java b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResults.java index a9d482c9c07..952d1cbb1d1 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResults.java +++ b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResults.java @@ -10,15 +10,22 @@ import org.apache.commons.collections.keyvalue.UnmodifiableMapEntry; public class CompositeQueryLogicResults implements Iterable, Thread.UncaughtExceptionHandler { - + + private final CompositeQueryLogic logic; private final ArrayBlockingQueue results; - private final CountDownLatch completionLatch; private final List handlers; private final List> exceptions; - - public CompositeQueryLogicResults(int pagesize, CountDownLatch completionLatch) { + + public CompositeQueryLogicResults() { + this.logic = null; + this.results = new ArrayBlockingQueue<>(1); + this.handlers = new ArrayList<>(); + this.exceptions = new ArrayList<>(); + } + + public CompositeQueryLogicResults(CompositeQueryLogic logic, int pagesize) { + this.logic = logic; this.results = new ArrayBlockingQueue<>(pagesize); - this.completionLatch = completionLatch; this.handlers = new ArrayList<>(); this.exceptions = new ArrayList<>(); } @@ -41,7 +48,7 @@ public boolean contains(Object o) { @Override public Iterator iterator() { - CompositeQueryLogicResultsIterator it = new CompositeQueryLogicResultsIterator(this.results, this.completionLatch); + CompositeQueryLogicResultsIterator it = new CompositeQueryLogicResultsIterator(logic, this.results); synchronized (handlers) { // first pass any exceptions we have already seen for (Map.Entry exception : exceptions) { diff --git a/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResultsIterator.java b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResultsIterator.java index bc984e519fa..63956cc98e1 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResultsIterator.java +++ b/web-services/query/src/main/java/datawave/webservice/query/logic/composite/CompositeQueryLogicResultsIterator.java @@ -1,8 +1,12 @@ package datawave.webservice.query.logic.composite; +import com.google.common.base.Throwables; +import datawave.webservice.query.configuration.GenericQueryConfiguration; +import datawave.webservice.query.exception.EmptyObjectException; +import org.apache.log4j.Logger; + import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -12,16 +16,18 @@ public class CompositeQueryLogicResultsIterator implements Iterator, Thread.UncaughtExceptionHandler { protected static final Logger log = Logger.getLogger(CompositeQueryLogicResultsIterator.class); - + + private final CompositeQueryLogic logic; + private final ArrayBlockingQueue results; private Object nextEntry = null; + private boolean seenEntries = false; private final Object lock = new Object(); - private final CountDownLatch completionLatch; private volatile Throwable failure = null; - - public CompositeQueryLogicResultsIterator(ArrayBlockingQueue results, CountDownLatch completionLatch) { + + public CompositeQueryLogicResultsIterator(CompositeQueryLogic logic, ArrayBlockingQueue results) { + this.logic = logic; this.results = results; - this.completionLatch = completionLatch; } @Override @@ -30,20 +36,45 @@ public boolean hasNext() { if (failure != null) { Throwables.propagate(failure); } - if (nextEntry != null) - return true; - try { - while (nextEntry == null && failure == null && (!results.isEmpty() || completionLatch.getCount() > 0)) { - nextEntry = results.poll(1, TimeUnit.SECONDS); + while (nextEntry == null) { + try { + while (nextEntry == null && failure == null && (!results.isEmpty() || logic.getCompletionLatch().getCount() > 0)) { + nextEntry = results.poll(1, TimeUnit.SECONDS); + } + if (failure != null) { + Throwables.propagate(failure); + } + if (nextEntry == null) { + // if the current execution threads are complete + // and we are in the sequential execution mode + // and we have not seen an result yet + // and we have more logics to initialize + // then initialize the next logic and continue. + if (logic.getCompletionLatch().getCount() == 0 && logic.isShortCircuitExecution() && !seenEntries + && !logic.getUninitializedLogics().isEmpty()) { + try { + GenericQueryConfiguration config = logic.initialize(logic.getConfig().getClient(), logic.getSettings(), logic.getConfig() + .getAuthorizations()); + logic.setupQuery(config); + } catch (Exception e) { + Throwables.propagate(e); + } + } else { + break; + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } - if (failure != null) { - Throwables.propagate(failure); - } - return true; - } catch (InterruptedException e) { - throw new RuntimeException(e); } } + if (nextEntry != null) { + if (!(nextEntry instanceof EmptyObjectException)) { + seenEntries = true; + } + return true; + } + return false; } @Override @@ -59,6 +90,9 @@ public Object next() { nextEntry = null; } } + if (current instanceof EmptyObjectException) { + throw new EmptyObjectException(); + } return current; } diff --git a/web-services/query/src/test/java/datawave/webservice/query/logic/composite/CompositeQueryLogicTest.java b/web-services/query/src/test/java/datawave/webservice/query/logic/composite/CompositeQueryLogicTest.java index b4c6499046d..6a12775ffca 100644 --- a/web-services/query/src/test/java/datawave/webservice/query/logic/composite/CompositeQueryLogicTest.java +++ b/web-services/query/src/test/java/datawave/webservice/query/logic/composite/CompositeQueryLogicTest.java @@ -6,12 +6,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.data.Key; @@ -43,6 +43,7 @@ import datawave.webservice.query.cache.ResultsPage; import datawave.webservice.query.cache.ResultsPage.Status; import datawave.webservice.query.configuration.GenericQueryConfiguration; +import datawave.webservice.query.exception.EmptyObjectException; import datawave.webservice.query.logic.BaseQueryLogic; import datawave.webservice.query.logic.BaseQueryLogicTransformer; import datawave.webservice.query.logic.DatawaveRoleManager; @@ -180,10 +181,13 @@ public TestQueryLogicTransformer(MarkingFunctions markingFunctions) { } @Override - public TestQueryResponse transform(Entry input) { + public TestQueryResponse transform(Entry input) throws EmptyObjectException { if (input instanceof Entry) { @SuppressWarnings("unchecked") Entry entry = (Entry) input; + if (entry.getValue() == null) { + throw new EmptyObjectException(); + } // first check if we should be failing here if (entry.getKey().equals(keyFailure)) { throw new RuntimeException(entry.getValue().toString()); @@ -191,6 +195,7 @@ public TestQueryResponse transform(Entry input) { TestQueryResponse r = new TestQueryResponse(); r.setKey(entry.getKey().toString()); r.setValue(entry.getValue().toString()); + r.setHasResults(true); return r; } else { throw new IllegalArgumentException("Invalid input type: " + input.getClass()); @@ -216,10 +221,13 @@ public DifferentTestQueryLogicTransformer(MarkingFunctions markingFunctions) { } @Override - public TestQueryResponse transform(Entry input) { + public TestQueryResponse transform(Entry input) throws EmptyObjectException { if (input instanceof Entry) { @SuppressWarnings("unchecked") Entry entry = (Entry) input; + if (entry.getValue() == null) { + throw new EmptyObjectException(); + } TestQueryResponse r = new TestQueryResponse(); r.setKey(entry.getKey().toString()); r.setValue(new String(entry.getValue().get())); @@ -237,9 +245,9 @@ public BaseQueryResponse createResponse(List resultList) { } public static class TestQueryLogic extends BaseQueryLogic> { - - private Map data = new ConcurrentHashMap<>(); - + + private Map data = Collections.synchronizedMap(new LinkedHashMap<>()); + private final UserOperations userOperations; private Set auths; @@ -358,8 +366,9 @@ public GenericResponse flushCachedCredentials(Object callerObject) { } public static class TestQueryLogic2 extends TestQueryLogic { - private Map data = new ConcurrentHashMap<>(); - + + private Map data = Collections.synchronizedMap(new LinkedHashMap<>()); + public Map getData() { return data; } @@ -465,8 +474,8 @@ public void testClone() throws Exception { c.setPrincipal(principal); c.initialize(null, settings, Collections.singleton(auths)); c.getTransformer(settings); - - Assert.assertEquals(2, c.getQueryLogics().size()); + + Assert.assertEquals(2, c.getInitializedLogics().size()); } @Test @@ -489,8 +498,8 @@ public void testInitializeOKWithSameQueryLogicAndTableNames() throws Exception { c.setPrincipal(principal); c.initialize(null, settings, Collections.singleton(auths)); c.getTransformer(settings); - - Assert.assertEquals(2, c.getQueryLogics().size()); + + Assert.assertEquals(2, c.getInitializedLogics().size()); } @Test @@ -524,8 +533,8 @@ public String getTableName() { c.initialize(null, settings, Collections.singleton(auths)); c.getTransformer(settings); - - Assert.assertEquals(2, c.getQueryLogics().size()); + + Assert.assertEquals(2, c.getInitializedLogics().size()); } @Test @@ -549,8 +558,8 @@ public void testInitialize() throws Exception { c.initialize(null, settings, Collections.singleton(auths)); c.getTransformer(settings); - - Assert.assertEquals(2, c.getQueryLogics().size()); + + Assert.assertEquals(2, c.getInitializedLogics().size()); } @Test @@ -578,8 +587,8 @@ public GenericQueryConfiguration initialize(AccumuloClient connection, Query set c.setPrincipal(principal); c.initialize(null, settings, Collections.singleton(auths)); - - Assert.assertEquals(1, c.getQueryLogics().size()); + + Assert.assertEquals(1, c.getInitializedLogics().size()); } @Test(expected = CompositeLogicException.class) @@ -792,7 +801,273 @@ public void testQueryLogic() throws Exception { c.close(); } - + + @Test + public void testQueryLogicWithEmptyEvent() throws Exception { + Map> logics = new HashMap<>(); + TestQueryLogic logic1 = new TestQueryLogic(); + TestQueryLogic2 logic2 = new TestQueryLogic2(); + logics.put("TestQueryLogic", logic1); + logics.put("TestQueryLogic2", logic2); + + logic1.getData().put(key1, value1); + logic1.getData().put(key2, null); + logic2.getData().put(key3, value3); + logic2.getData().put(key4, value4); + logic1.getData().put(key5, value5); + logic1.getData().put(key6, value6); + logic2.getData().put(key7, value7); + logic2.getData().put(key8, value8); + + QueryImpl settings = new QueryImpl(); + settings.setPagesize(100); + settings.setQueryAuthorizations(auths.toString()); + settings.setQuery("FOO == 'BAR'"); + settings.setParameters(new HashSet<>()); + settings.setId(UUID.randomUUID()); + + CompositeQueryLogic c = new CompositeQueryLogic(); + // max.results.override is set to -1 when it is not passed in as it is an optional paramter + logic1.setMaxResults(-1); + logic2.setMaxResults(-1); + /** + * RunningQuery.setupConnection() + */ + c.setQueryLogics(logics); + c.setPrincipal(principal); + c.initialize((AccumuloClient) null, (Query) settings, Collections.singleton(auths)); + c.setupQuery(null); + TransformIterator iter = c.getTransformIterator((Query) settings); + + /** + * RunningQuery.next() - iterate over results coming from tablet server through the TransformIterator to turn them into the objects. + */ + List results = new ArrayList<>(); + while (iter.hasNext()) { + Object o = iter.next(); + if (null == o) + break; + Assert.assertTrue(o instanceof TestQueryResponse); + results.add((TestQueryResponse) o); + } + Assert.assertEquals(7, results.size()); + ResultsPage page = new ResultsPage(results, Status.COMPLETE); + + /** + * QueryExecutorBean.next() - transform list of objects into JAXB response + */ + TestQueryResponseList response = (TestQueryResponseList) c.getEnrichedTransformer((Query) settings).createResponse(page); + Assert.assertEquals(7, response.getResponses().size()); + for (TestQueryResponse r : response.getResponses()) { + Assert.assertNotNull(r); + } + + c.close(); + + } + + @Test + public void testQueryLogicShortCircuitExecution() throws Exception { + Map> logics = new HashMap<>(); + TestQueryLogic logic1 = new TestQueryLogic(); + TestQueryLogic2 logic2 = new TestQueryLogic2(); + logics.put("TestQueryLogic", logic1); + logics.put("TestQueryLogic2", logic2); + + logic1.getData().put(key1, value1); + logic1.getData().put(key2, value2); + logic2.getData().put(key3, value3); + logic2.getData().put(key4, value4); + logic1.getData().put(key5, value5); + logic1.getData().put(key6, value6); + logic2.getData().put(key7, value7); + logic2.getData().put(key8, value8); + + QueryImpl settings = new QueryImpl(); + settings.setPagesize(100); + settings.setQueryAuthorizations(auths.toString()); + settings.setQuery("FOO == 'BAR'"); + settings.setParameters(new HashSet<>()); + settings.setId(UUID.randomUUID()); + + CompositeQueryLogic c = new CompositeQueryLogic(); + // max.results.override is set to -1 when it is not passed in as it is an optional paramter + logic1.setMaxResults(-1); + logic2.setMaxResults(-1); + /** + * RunningQuery.setupConnection() + */ + c.setQueryLogics(logics); + c.setPrincipal(principal); + c.setShortCircuitExecution(true); + c.initialize((AccumuloClient) null, (Query) settings, Collections.singleton(auths)); + c.setupQuery(null); + TransformIterator iter = c.getTransformIterator((Query) settings); + + /** + * RunningQuery.next() - iterate over results coming from tablet server through the TransformIterator to turn them into the objects. + */ + List results = new ArrayList<>(); + while (iter.hasNext()) { + Object o = iter.next(); + if (null == o) + break; + Assert.assertTrue(o instanceof TestQueryResponse); + results.add((TestQueryResponse) o); + } + // only half the results if both had been run + Assert.assertEquals(4, results.size()); + ResultsPage page = new ResultsPage(results, Status.COMPLETE); + Assert.assertFalse(c.getUninitializedLogics().isEmpty()); + Assert.assertFalse(c.getInitializedLogics().isEmpty()); + + /** + * QueryExecutorBean.next() - transform list of objects into JAXB response + */ + TestQueryResponseList response = (TestQueryResponseList) c.getEnrichedTransformer((Query) settings).createResponse(page); + Assert.assertEquals(4, response.getResponses().size()); + for (TestQueryResponse r : response.getResponses()) { + Assert.assertNotNull(r); + } + + c.close(); + + } + + @Test + public void testQueryLogicShortCircuitExecutionWithEmptyEvent() throws Exception { + Map> logics = new HashMap<>(); + TestQueryLogic logic1 = new TestQueryLogic(); + TestQueryLogic2 logic2 = new TestQueryLogic2(); + logics.put("TestQueryLogic", logic1); + logics.put("TestQueryLogic2", logic2); + + logic1.getData().put(key1, value1); + logic1.getData().put(key2, null); + logic2.getData().put(key3, value3); + logic2.getData().put(key4, value4); + logic1.getData().put(key5, value5); + logic1.getData().put(key6, value6); + logic2.getData().put(key7, value7); + logic2.getData().put(key8, value8); + + QueryImpl settings = new QueryImpl(); + settings.setPagesize(100); + settings.setQueryAuthorizations(auths.toString()); + settings.setQuery("FOO == 'BAR'"); + settings.setParameters(new HashSet<>()); + settings.setId(UUID.randomUUID()); + + CompositeQueryLogic c = new CompositeQueryLogic(); + // max.results.override is set to -1 when it is not passed in as it is an optional paramter + logic1.setMaxResults(-1); + logic2.setMaxResults(-1); + /** + * RunningQuery.setupConnection() + */ + c.setQueryLogics(logics); + c.setPrincipal(principal); + c.setShortCircuitExecution(true); + c.initialize((AccumuloClient) null, (Query) settings, Collections.singleton(auths)); + c.setupQuery(null); + TransformIterator iter = c.getTransformIterator((Query) settings); + + /** + * RunningQuery.next() - iterate over results coming from tablet server through the TransformIterator to turn them into the objects. + */ + List results = new ArrayList<>(); + while (iter.hasNext()) { + Object o = iter.next(); + if (null == o) + break; + Assert.assertTrue(o instanceof TestQueryResponse); + results.add((TestQueryResponse) o); + } + // only half the results if both had been run + Assert.assertEquals(3, results.size()); + ResultsPage page = new ResultsPage(results, Status.COMPLETE); + Assert.assertFalse(c.getUninitializedLogics().isEmpty()); + Assert.assertFalse(c.getInitializedLogics().isEmpty()); + + /** + * QueryExecutorBean.next() - transform list of objects into JAXB response + */ + TestQueryResponseList response = (TestQueryResponseList) c.getEnrichedTransformer((Query) settings).createResponse(page); + Assert.assertEquals(3, response.getResponses().size()); + for (TestQueryResponse r : response.getResponses()) { + Assert.assertNotNull(r); + } + + c.close(); + + } + + @Test + public void testQueryLogicShortCircuitExecutionHitsSecondLogic() throws Exception { + Map> logics = new HashMap<>(); + TestQueryLogic logic1 = new TestQueryLogic(); + TestQueryLogic2 logic2 = new TestQueryLogic2(); + logics.put("TestQueryLogic", logic1); + logics.put("TestQueryLogic2", logic2); + + logic1.getData().put(key1, null); + logic2.getData().put(key3, value3); + logic2.getData().put(key4, value4); + logic2.getData().put(key7, value7); + logic2.getData().put(key8, value8); + + QueryImpl settings = new QueryImpl(); + settings.setPagesize(100); + settings.setQueryAuthorizations(auths.toString()); + settings.setQuery("FOO == 'BAR'"); + settings.setParameters(new HashSet<>()); + settings.setId(UUID.randomUUID()); + + CompositeQueryLogic c = new CompositeQueryLogic(); + // max.results.override is set to -1 when it is not passed in as it is an optional paramter + logic1.setMaxResults(-1); + logic2.setMaxResults(-1); + /** + * RunningQuery.setupConnection() + */ + c.setQueryLogics(logics); + c.setPrincipal(principal); + c.setShortCircuitExecution(true); + c.initialize((AccumuloClient) null, (Query) settings, Collections.singleton(auths)); + c.setupQuery(null); + TransformIterator iter = c.getTransformIterator((Query) settings); + + /** + * RunningQuery.next() - iterate over results coming from tablet server through the TransformIterator to turn them into the objects. + */ + List results = new ArrayList<>(); + while (iter.hasNext()) { + Object o = iter.next(); + if (null == o) + break; + Assert.assertTrue(o instanceof TestQueryResponse); + results.add((TestQueryResponse) o); + } + Assert.assertEquals(4, results.size()); + ResultsPage page = new ResultsPage(results, Status.COMPLETE); + + // ensure both were actually run + Assert.assertTrue(c.getUninitializedLogics().isEmpty()); + Assert.assertFalse(c.getInitializedLogics().isEmpty()); + + /** + * QueryExecutorBean.next() - transform list of objects into JAXB response + */ + TestQueryResponseList response = (TestQueryResponseList) c.getEnrichedTransformer((Query) settings).createResponse(page); + Assert.assertEquals(4, response.getResponses().size()); + for (TestQueryResponse r : response.getResponses()) { + Assert.assertNotNull(r); + } + + c.close(); + + } + @Test(expected = CompositeLogicException.class) public void testQueryLogicWithNextFailure() throws Exception { Map> logics = new HashMap<>();