diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 2acdcf3c3b0ab..69a99e34ca146 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -65,9 +65,14 @@ public final class Messages { public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]"; - public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING = "Finished reindexing to destination index [{0}]"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING = + "Finished reindexing to destination index [{0}], took [{1}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS = "Finished analysis"; public static final String DATA_FRAME_ANALYTICS_AUDIT_RESTORING_STATE = "Restoring from previous model state"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results"; public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}"; public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index a04f7d8d3eef3..e1025de84c4ff 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -119,7 +119,11 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws "Starting analytics on node", "Started analytics", expectedDestIndexAuditMessage(), + "Started reindexing to destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField); } @@ -160,7 +164,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti "Starting analytics on node", "Started analytics", expectedDestIndexAuditMessage(), + "Started reindexing to destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField); } @@ -223,7 +231,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty(String jobId, "Starting analytics on node", "Started analytics", expectedDestIndexAuditMessage(), + "Started reindexing to destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); assertEvaluation(dependentVariable, dependentVariableValues, "ml." + predictedClassField); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 75c2abe3064e3..ee55209134900 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -233,6 +233,7 @@ protected static void assertThatAuditMessagesMatch(String configId, String... ex // Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start, // finished the job (as this is a very short analytics job), all without the audit being fully written. assertBusy(() -> assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX))); + @SuppressWarnings("unchecked") Matcher[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new); assertBusy(() -> { @@ -252,6 +253,7 @@ private static List fetchAllAuditMessages(String dataFrameAnalyticsId) { .setIndices(NotificationsIndex.NOTIFICATIONS_INDEX) .addSort("timestamp", SortOrder.ASC) .setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId)) + .setSize(100) .request(); SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 2179a18c91b97..0bdf2bd65e40a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -103,7 +103,11 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", + "Started reindexing to destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } @@ -142,7 +146,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", + "Started reindexing to destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } @@ -196,7 +204,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", + "Started reindexing to destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 9b51c963f1f45..4ffe948c9f5b7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -126,7 +126,11 @@ public void testOutlierDetectionWithFewDocuments() throws Exception { "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-few-docs-results]", + "Started reindexing to destination index [test-outlier-detection-with-few-docs-results]", "Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } @@ -181,7 +185,11 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception { "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", + "Started reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", "Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } @@ -262,7 +270,11 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]", + "Started reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]", "Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } @@ -387,7 +399,11 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception { "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-multiple-source-indices-results]", + "Started reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]", "Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } @@ -445,7 +461,11 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception { "Starting analytics on node", "Started analytics", "Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]", + "Started reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]", "Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } @@ -699,7 +719,11 @@ public void testOutlierDetectionWithCustomParams() throws Exception { "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-custom-params-results]", + "Started reindexing to destination index [test-outlier-detection-with-custom-params-results]", "Finished reindexing to destination index [test-outlier-detection-with-custom-params-results]", + "Started loading data", + "Started analyzing", + "Started writing results", "Finished analysis"); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 79e5ed0e2f0bd..8cc294b8b5fde 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -213,7 +213,8 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF task.setReindexingFinished(); auditor.info( config.getId(), - Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex())); + Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(), + reindexResponse.getTook())); startAnalytics(task, config); }, error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage()) @@ -233,9 +234,12 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF final ThreadContext threadContext = client.threadPool().getThreadContext(); final Supplier supplier = threadContext.newRestorableContext(false); try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) { + LOGGER.info("[{}] Started reindexing", config.getId()); Task reindexTask = client.executeLocally(ReindexAction.INSTANCE, reindexRequest, new ContextPreservingActionListener<>(supplier, reindexCompletedListener)); task.setReindexingTaskId(reindexTask.getId()); + auditor.info(config.getId(), + Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING, config.getDest().getIndex())); } }, reindexCompletedListener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 0ca7474f49e65..b967e1d47118c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -147,6 +147,9 @@ private BytesReference getModelState(DataFrameAnalyticsConfig config) { } private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) { + LOGGER.info("[{}] Started loading data", processContext.config.getId()); + auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA)); + DataFrameAnalyticsConfig config = processContext.config; DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get(); AnalyticsProcess process = processContext.process.get(); @@ -159,6 +162,9 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont restoreState(task, config, state, process); + LOGGER.info("[{}] Started analyzing", processContext.config.getId()); + auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING)); + LOGGER.info("[{}] Waiting for result processor to complete", config.getId()); resultProcessor.awaitForCompletion(); processContext.setFailureReason(resultProcessor.getFailure()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index cd990ca11ab33..ab95e93936b49 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition; import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.security.user.XPackUser; @@ -120,6 +121,10 @@ public void process(AnalyticsProcess process) { AnalyticsResult result = iterator.next(); processResult(result, resultsJoiner); if (result.getRowResults() != null) { + if (processedRows == 0) { + LOGGER.info("[{}] Started writing results", analytics.getId()); + auditor.info(analytics.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS)); + } processedRows++; updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows)); }