diff --git a/x-pack/plugins/ml/common/types/modules.ts b/x-pack/plugins/ml/common/types/modules.ts index e61ff9972d601e..b476762f6efca5 100644 --- a/x-pack/plugins/ml/common/types/modules.ts +++ b/x-pack/plugins/ml/common/types/modules.ts @@ -90,8 +90,14 @@ export interface DataRecognizerConfigResponse { }; } -export type GeneralOverride = any; - export type JobOverride = Partial; +export type GeneralJobsOverride = Omit; +export type JobSpecificOverride = JobOverride & { job_id: Job['job_id'] }; + +export function isGeneralJobOverride(override: JobOverride): override is GeneralJobsOverride { + return override.job_id === undefined; +} + +export type GeneralDatafeedsOverride = Partial>; export type DatafeedOverride = Partial; diff --git a/x-pack/plugins/ml/public/application/jobs/new_job/recognize/page.tsx b/x-pack/plugins/ml/public/application/jobs/new_job/recognize/page.tsx index 50c35ec426acb0..9b76b9be9bf45e 100644 --- a/x-pack/plugins/ml/public/application/jobs/new_job/recognize/page.tsx +++ b/x-pack/plugins/ml/public/application/jobs/new_job/recognize/page.tsx @@ -172,6 +172,7 @@ export const Page: FC = ({ moduleId, existingGroupIds }) => { startDatafeed: startDatafeedAfterSave, ...(jobOverridesPayload !== null ? { jobOverrides: jobOverridesPayload } : {}), ...resultTimeRange, + estimateModelMemory: false, }); const { datafeeds: datafeedsResponse, jobs: jobsResponse, kibana: kibanaResponse } = response; diff --git a/x-pack/plugins/ml/public/application/services/ml_api_service/index.ts b/x-pack/plugins/ml/public/application/services/ml_api_service/index.ts index cd4a97bd10ed4c..df59678452e2fe 100644 --- a/x-pack/plugins/ml/public/application/services/ml_api_service/index.ts +++ b/x-pack/plugins/ml/public/application/services/ml_api_service/index.ts @@ -367,6 +367,7 @@ export const ml = { start, end, jobOverrides, + estimateModelMemory, }: { moduleId: string; prefix?: string; @@ -378,6 +379,7 @@ export const ml = { start?: number; end?: number; jobOverrides?: Array>; + estimateModelMemory?: boolean; }) { const body = JSON.stringify({ prefix, @@ -389,6 +391,7 @@ export const ml = { start, end, jobOverrides, + estimateModelMemory, }); return http({ diff --git a/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts b/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts index c97bbe07fffda1..cd61dd9eddcdd7 100644 --- a/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts +++ b/x-pack/plugins/ml/server/models/calculate_model_memory_limit/calculate_model_memory_limit.ts @@ -6,6 +6,7 @@ import numeral from '@elastic/numeral'; import { APICaller } from 'kibana/server'; +import { MLCATEGORY } from '../../../common/constants/field_types'; import { AnalysisConfig } from '../../../common/types/anomaly_detection_jobs'; import { fieldsServiceProvider } from '../fields_service'; @@ -34,92 +35,96 @@ export interface ModelMemoryEstimate { /** * Retrieves overall and max bucket cardinalities. */ -async function getCardinalities( - callAsCurrentUser: APICaller, - analysisConfig: AnalysisConfig, - indexPattern: string, - query: any, - timeFieldName: string, - earliestMs: number, - latestMs: number -): Promise<{ - overallCardinality: { [key: string]: number }; - maxBucketCardinality: { [key: string]: number }; -}> { - /** - * Fields not involved in cardinality check - */ - const excludedKeywords = new Set( - /** - * The keyword which is used to mean the output of categorization, - * so it will have cardinality zero in the actual input data. - */ - 'mlcategory' - ); - +const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => { const fieldsService = fieldsServiceProvider(callAsCurrentUser); - const { detectors, influencers, bucket_span: bucketSpan } = analysisConfig; - - let overallCardinality = {}; - let maxBucketCardinality = {}; - const overallCardinalityFields: Set = detectors.reduce( - ( - acc, - { - by_field_name: byFieldName, - partition_field_name: partitionFieldName, - over_field_name: overFieldName, - } - ) => { - [byFieldName, partitionFieldName, overFieldName] - .filter(field => field !== undefined && field !== '' && !excludedKeywords.has(field)) - .forEach(key => { - acc.add(key as string); - }); - return acc; - }, - new Set() - ); - - const maxBucketFieldCardinalities: string[] = influencers.filter( - influencerField => - typeof influencerField === 'string' && - !excludedKeywords.has(influencerField) && - !!influencerField && - !overallCardinalityFields.has(influencerField) - ) as string[]; - - if (overallCardinalityFields.size > 0) { - overallCardinality = await fieldsService.getCardinalityOfFields( - indexPattern, - [...overallCardinalityFields], - query, - timeFieldName, - earliestMs, - latestMs + return async ( + analysisConfig: AnalysisConfig, + indexPattern: string, + query: any, + timeFieldName: string, + earliestMs: number, + latestMs: number + ): Promise<{ + overallCardinality: { [key: string]: number }; + maxBucketCardinality: { [key: string]: number }; + }> => { + /** + * Fields not involved in cardinality check + */ + const excludedKeywords = new Set( + /** + * The keyword which is used to mean the output of categorization, + * so it will have cardinality zero in the actual input data. + */ + MLCATEGORY ); - } - if (maxBucketFieldCardinalities.length > 0) { - maxBucketCardinality = await fieldsService.getMaxBucketCardinalities( - indexPattern, - maxBucketFieldCardinalities, - query, - timeFieldName, - earliestMs, - latestMs, - bucketSpan + const { detectors, influencers, bucket_span: bucketSpan } = analysisConfig; + + let overallCardinality = {}; + let maxBucketCardinality = {}; + + // Get fields required for the model memory estimation + const overallCardinalityFields: Set = detectors.reduce( + ( + acc, + { + by_field_name: byFieldName, + partition_field_name: partitionFieldName, + over_field_name: overFieldName, + } + ) => { + [byFieldName, partitionFieldName, overFieldName] + .filter(field => field !== undefined && field !== '' && !excludedKeywords.has(field)) + .forEach(key => { + acc.add(key as string); + }); + return acc; + }, + new Set() ); - } - return { - overallCardinality, - maxBucketCardinality, + const maxBucketFieldCardinalities: string[] = influencers.filter( + influencerField => + !!influencerField && + !excludedKeywords.has(influencerField) && + !overallCardinalityFields.has(influencerField) + ) as string[]; + + if (overallCardinalityFields.size > 0) { + overallCardinality = await fieldsService.getCardinalityOfFields( + indexPattern, + [...overallCardinalityFields], + query, + timeFieldName, + earliestMs, + latestMs + ); + } + + if (maxBucketFieldCardinalities.length > 0) { + maxBucketCardinality = await fieldsService.getMaxBucketCardinalities( + indexPattern, + maxBucketFieldCardinalities, + query, + timeFieldName, + earliestMs, + latestMs, + bucketSpan + ); + } + + return { + overallCardinality, + maxBucketCardinality, + }; }; -} +}; export function calculateModelMemoryLimitProvider(callAsCurrentUser: APICaller) { + const getCardinalities = cardinalityCheckProvider(callAsCurrentUser); + /** * Retrieves an estimated size of the model memory limit used in the job config * based on the cardinality of the fields being used to split the data @@ -145,7 +150,6 @@ export function calculateModelMemoryLimitProvider(callAsCurrentUser: APICaller) } const { overallCardinality, maxBucketCardinality } = await getCardinalities( - callAsCurrentUser, analysisConfig, indexPattern, query, diff --git a/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.ts b/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.ts index a54c2f22a79515..824f9cc57982c4 100644 --- a/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.ts +++ b/x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.ts @@ -7,10 +7,12 @@ import fs from 'fs'; import Boom from 'boom'; import numeral from '@elastic/numeral'; -import { CallAPIOptions, APICaller, SavedObjectsClientContract } from 'kibana/server'; +import { APICaller, SavedObjectsClientContract } from 'kibana/server'; +import moment from 'moment'; import { IndexPatternAttributes } from 'src/plugins/data/server'; import { merge } from 'lodash'; -import { CombinedJobWithStats } from '../../../common/types/anomaly_detection_jobs'; +import { AnalysisLimits, CombinedJobWithStats } from '../../../common/types/anomaly_detection_jobs'; +import { MlInfoResponse } from '../../../common/types/ml_server_info'; import { KibanaObjects, ModuleDataFeed, @@ -18,14 +20,19 @@ import { Module, JobOverride, DatafeedOverride, - GeneralOverride, + GeneralJobsOverride, DatafeedResponse, JobResponse, KibanaObjectResponse, DataRecognizerConfigResponse, + GeneralDatafeedsOverride, + JobSpecificOverride, + isGeneralJobOverride, } from '../../../common/types/modules'; import { getLatestDataOrBucketTimestamp, prefixDatafeedId } from '../../../common/util/job_utils'; import { mlLog } from '../../client/log'; +import { calculateModelMemoryLimitProvider } from '../calculate_model_memory_limit'; +import { fieldsServiceProvider } from '../fields_service'; import { jobServiceProvider } from '../job_service'; import { resultsServiceProvider } from '../results_service'; @@ -107,18 +114,15 @@ export class DataRecognizer { modulesDir = `${__dirname}/modules`; indexPatternName: string = ''; indexPatternId: string | undefined = undefined; - savedObjectsClient: SavedObjectsClientContract; + /** + * List of the module jobs that require model memory estimation + */ + jobsForModelMemoryEstimation: ModuleJob[] = []; - callAsCurrentUser: ( - endpoint: string, - clientParams?: Record, - options?: CallAPIOptions - ) => Promise; - - constructor(callAsCurrentUser: APICaller, savedObjectsClient: SavedObjectsClientContract) { - this.callAsCurrentUser = callAsCurrentUser; - this.savedObjectsClient = savedObjectsClient; - } + constructor( + private callAsCurrentUser: APICaller, + private savedObjectsClient: SavedObjectsClientContract + ) {} // list all directories under the given directory async listDirs(dirName: string): Promise { @@ -367,16 +371,17 @@ export class DataRecognizer { // if any of the savedObjects already exist, they will not be overwritten. async setupModuleItems( moduleId: string, - jobPrefix: string, - groups: string[], - indexPatternName: string, - query: any, - useDedicatedIndex: boolean, - startDatafeed: boolean, - start: number, - end: number, - jobOverrides: JobOverride[], - datafeedOverrides: DatafeedOverride[] + jobPrefix?: string, + groups?: string[], + indexPatternName?: string, + query?: any, + useDedicatedIndex?: boolean, + startDatafeed?: boolean, + start?: number, + end?: number, + jobOverrides?: JobOverride | JobOverride[], + datafeedOverrides?: DatafeedOverride | DatafeedOverride[], + estimateModelMemory?: boolean ) { // load the config from disk const moduleConfig = await this.getModule(moduleId, jobPrefix); @@ -418,11 +423,13 @@ export class DataRecognizer { savedObjects: [] as KibanaObjectResponse[], }; + this.jobsForModelMemoryEstimation = moduleConfig.jobs; + this.applyJobConfigOverrides(moduleConfig, jobOverrides, jobPrefix); this.applyDatafeedConfigOverrides(moduleConfig, datafeedOverrides, jobPrefix); this.updateDatafeedIndices(moduleConfig); this.updateJobUrlIndexPatterns(moduleConfig); - await this.updateModelMemoryLimits(moduleConfig); + await this.updateModelMemoryLimits(moduleConfig, estimateModelMemory, start, end); // create the jobs if (moduleConfig.jobs && moduleConfig.jobs.length) { @@ -689,8 +696,8 @@ export class DataRecognizer { async startDatafeeds( datafeeds: ModuleDataFeed[], - start: number, - end: number + start?: number, + end?: number ): Promise<{ [key: string]: DatafeedResponse }> { const results = {} as { [key: string]: DatafeedResponse }; for (const datafeed of datafeeds) { @@ -933,28 +940,117 @@ export class DataRecognizer { } } - // ensure the model memory limit for each job is not greater than - // the max model memory setting for the cluster - async updateModelMemoryLimits(moduleConfig: Module) { - const { limits } = await this.callAsCurrentUser('ml.info'); + /** + * Provides a time range of the last 3 months of data + */ + async getFallbackTimeRange( + timeField: string, + query?: any + ): Promise<{ start: number; end: number }> { + const fieldsService = fieldsServiceProvider(this.callAsCurrentUser); + + const timeFieldRange = await fieldsService.getTimeFieldRange( + this.indexPatternName, + timeField, + query + ); + + return { + start: timeFieldRange.end.epoch - moment.duration(3, 'months').asMilliseconds(), + end: timeFieldRange.end.epoch, + }; + } + + /** + * Ensure the model memory limit for each job is not greater than + * the max model memory setting for the cluster + */ + async updateModelMemoryLimits( + moduleConfig: Module, + estimateMML: boolean = false, + start?: number, + end?: number + ) { + if (!Array.isArray(moduleConfig.jobs)) { + return; + } + + if (estimateMML && this.jobsForModelMemoryEstimation.length > 0) { + const calculateModelMemoryLimit = calculateModelMemoryLimitProvider(this.callAsCurrentUser); + const query = moduleConfig.query ?? null; + + // Checks if all jobs in the module have the same time field configured + const isSameTimeFields = this.jobsForModelMemoryEstimation.every( + job => + job.config.data_description.time_field === + this.jobsForModelMemoryEstimation[0].config.data_description.time_field + ); + + if (isSameTimeFields && (start === undefined || end === undefined)) { + // In case of time range is not provided and the time field is the same + // set the fallback range for all jobs + const { start: fallbackStart, end: fallbackEnd } = await this.getFallbackTimeRange( + this.jobsForModelMemoryEstimation[0].config.data_description.time_field, + query + ); + start = fallbackStart; + end = fallbackEnd; + } + + for (const job of this.jobsForModelMemoryEstimation) { + let earliestMs = start; + let latestMs = end; + if (earliestMs === undefined || latestMs === undefined) { + const timeFieldRange = await this.getFallbackTimeRange( + job.config.data_description.time_field, + query + ); + earliestMs = timeFieldRange.start; + latestMs = timeFieldRange.end; + } + + const { modelMemoryLimit } = await calculateModelMemoryLimit( + job.config.analysis_config, + this.indexPatternName, + query, + job.config.data_description.time_field, + earliestMs, + latestMs + ); + + if (!job.config.analysis_limits) { + job.config.analysis_limits = {} as AnalysisLimits; + } + + job.config.analysis_limits.model_memory_limit = modelMemoryLimit; + } + } + + const { limits } = await this.callAsCurrentUser('ml.info'); const maxMml = limits.max_model_memory_limit; - if (maxMml !== undefined) { - // @ts-ignore - const maxBytes: number = numeral(maxMml.toUpperCase()).value(); - - if (Array.isArray(moduleConfig.jobs)) { - moduleConfig.jobs.forEach(job => { - const mml = job.config?.analysis_limits?.model_memory_limit; - if (mml !== undefined) { - // @ts-ignore - const mmlBytes: number = numeral(mml.toUpperCase()).value(); - if (mmlBytes > maxBytes) { - // if the job's mml is over the max, - // so set the jobs mml to be the max - job.config.analysis_limits!.model_memory_limit = maxMml; - } + + if (!maxMml) { + return; + } + + // @ts-ignore + const maxBytes: number = numeral(maxMml.toUpperCase()).value(); + + for (const job of moduleConfig.jobs) { + const mml = job.config?.analysis_limits?.model_memory_limit; + if (mml !== undefined) { + // @ts-ignore + const mmlBytes: number = numeral(mml.toUpperCase()).value(); + if (mmlBytes > maxBytes) { + // if the job's mml is over the max, + // so set the jobs mml to be the max + + if (!job.config.analysis_limits) { + job.config.analysis_limits = {} as AnalysisLimits; } - }); + + job.config.analysis_limits.model_memory_limit = maxMml; + } } } } @@ -975,7 +1071,11 @@ export class DataRecognizer { return false; } - applyJobConfigOverrides(moduleConfig: Module, jobOverrides: JobOverride[], jobPrefix = '') { + applyJobConfigOverrides( + moduleConfig: Module, + jobOverrides?: JobOverride | JobOverride[], + jobPrefix = '' + ) { if (jobOverrides === undefined || jobOverrides === null) { return; } @@ -993,17 +1093,26 @@ export class DataRecognizer { // separate all the overrides. // the overrides which don't contain a job id will be applied to all jobs in the module - const generalOverrides: GeneralOverride[] = []; - const jobSpecificOverrides: JobOverride[] = []; + const generalOverrides: GeneralJobsOverride[] = []; + const jobSpecificOverrides: JobSpecificOverride[] = []; overrides.forEach(override => { - if (override.job_id === undefined) { + if (isGeneralJobOverride(override)) { generalOverrides.push(override); } else { jobSpecificOverrides.push(override); } }); + if (generalOverrides.some(override => !!override.analysis_limits?.model_memory_limit)) { + this.jobsForModelMemoryEstimation = []; + } else { + this.jobsForModelMemoryEstimation = moduleConfig.jobs.filter(job => { + const override = jobSpecificOverrides.find(o => `${jobPrefix}${o.job_id}` === job.id); + return override?.analysis_limits?.model_memory_limit === undefined; + }); + } + function processArrayValues(source: any, update: any) { if (typeof source !== 'object' || typeof update !== 'object') { return; @@ -1052,7 +1161,7 @@ export class DataRecognizer { applyDatafeedConfigOverrides( moduleConfig: Module, - datafeedOverrides: DatafeedOverride | DatafeedOverride[], + datafeedOverrides?: DatafeedOverride | DatafeedOverride[], jobPrefix = '' ) { if (datafeedOverrides !== undefined && datafeedOverrides !== null) { @@ -1069,7 +1178,7 @@ export class DataRecognizer { // separate all the overrides. // the overrides which don't contain a datafeed id or a job id will be applied to all jobs in the module - const generalOverrides: GeneralOverride[] = []; + const generalOverrides: GeneralDatafeedsOverride[] = []; const datafeedSpecificOverrides: DatafeedOverride[] = []; overrides.forEach(o => { if (o.datafeed_id === undefined && o.job_id === undefined) { diff --git a/x-pack/plugins/ml/server/models/fields_service/fields_aggs_cache.ts b/x-pack/plugins/ml/server/models/fields_service/fields_aggs_cache.ts new file mode 100644 index 00000000000000..cdaefe6fdeed7c --- /dev/null +++ b/x-pack/plugins/ml/server/models/fields_service/fields_aggs_cache.ts @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { pick } from 'lodash'; + +/** + * Cached aggregation types + */ +type AggType = 'overallCardinality' | 'maxBucketCardinality'; + +type CacheStorage = { [key in AggType]: { [field: string]: number } }; + +/** + * Caches cardinality fields values to avoid + * unnecessary aggregations on elasticsearch + */ +export const initCardinalityFieldsCache = () => { + const cardinalityCache = new Map(); + + return { + /** + * Gets requested values from cache + */ + getValues( + indexPatternName: string | string[], + timeField: string, + earliestMs: number, + latestMs: number, + aggType: AggType, + fieldNames: string[] + ): CacheStorage[AggType] | null { + const cacheKey = indexPatternName + timeField + earliestMs + latestMs; + const cached = cardinalityCache.get(cacheKey); + if (!cached) { + return null; + } + return pick(cached[aggType], fieldNames); + }, + /** + * Extends cache with provided values + */ + updateValues( + indexPatternName: string | string[], + timeField: string, + earliestMs: number, + latestMs: number, + update: Partial + ): void { + const cacheKey = indexPatternName + timeField + earliestMs + latestMs; + const cachedValues = cardinalityCache.get(cacheKey); + if (cachedValues === undefined) { + cardinalityCache.set(cacheKey, { + overallCardinality: update.overallCardinality ?? {}, + maxBucketCardinality: update.maxBucketCardinality ?? {}, + }); + return; + } + + Object.assign(cachedValues.overallCardinality, update.overallCardinality); + Object.assign(cachedValues.maxBucketCardinality, update.maxBucketCardinality); + }, + }; +}; diff --git a/x-pack/plugins/ml/server/models/fields_service/fields_service.ts b/x-pack/plugins/ml/server/models/fields_service/fields_service.ts index d16984abc5d2a9..567c5d2afb7dea 100644 --- a/x-pack/plugins/ml/server/models/fields_service/fields_service.ts +++ b/x-pack/plugins/ml/server/models/fields_service/fields_service.ts @@ -7,12 +7,15 @@ import Boom from 'boom'; import { APICaller } from 'kibana/server'; import { parseInterval } from '../../../common/util/parse_interval'; +import { initCardinalityFieldsCache } from './fields_aggs_cache'; /** * Service for carrying out queries to obtain data * specific to fields in Elasticsearch indices. */ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { + const fieldsAggsCache = initCardinalityFieldsCache(); + /** * Gets aggregatable fields. */ @@ -58,6 +61,23 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { return {}; } + const cachedValues = + fieldsAggsCache.getValues( + index, + timeFieldName, + earliestMs, + latestMs, + 'overallCardinality', + fieldNames + ) ?? {}; + + // No need to perform aggregation over the cached fields + const fieldsToAgg = aggregatableFields.filter(field => !cachedValues.hasOwnProperty(field)); + + if (fieldsToAgg.length === 0) { + return cachedValues; + } + // Build the criteria to use in the bool filter part of the request. // Add criteria for the time range and the datafeed config query. const mustCriteria = [ @@ -76,7 +96,7 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { mustCriteria.push(query); } - const aggs = aggregatableFields.reduce((obj, field) => { + const aggs = fieldsToAgg.reduce((obj, field) => { obj[field] = { cardinality: { field } }; return obj; }, {} as { [field: string]: { cardinality: { field: string } } }); @@ -105,53 +125,63 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { return {}; } - return aggregatableFields.reduce((obj, field) => { + const aggResult = fieldsToAgg.reduce((obj, field) => { obj[field] = (aggregations[field] || { value: 0 }).value; return obj; }, {} as { [field: string]: number }); + + fieldsAggsCache.updateValues(index, timeFieldName, earliestMs, latestMs, { + overallCardinality: aggResult, + }); + + return { + ...cachedValues, + ...aggResult, + }; } - function getTimeFieldRange( + /** + * Gets time boundaries of the index data based on the provided time field. + */ + async function getTimeFieldRange( index: string[] | string, timeFieldName: string, query: any - ): Promise { - return new Promise((resolve, reject) => { - const obj = { success: true, start: { epoch: 0, string: '' }, end: { epoch: 0, string: '' } }; - - callAsCurrentUser('search', { - index, - size: 0, - body: { - query, - aggs: { - earliest: { - min: { - field: timeFieldName, - }, + ): Promise<{ + success: boolean; + start: { epoch: number; string: string }; + end: { epoch: number; string: string }; + }> { + const obj = { success: true, start: { epoch: 0, string: '' }, end: { epoch: 0, string: '' } }; + + const resp = await callAsCurrentUser('search', { + index, + size: 0, + body: { + ...(query ? { query } : {}), + aggs: { + earliest: { + min: { + field: timeFieldName, }, - latest: { - max: { - field: timeFieldName, - }, + }, + latest: { + max: { + field: timeFieldName, }, }, }, - }) - .then(resp => { - if (resp.aggregations && resp.aggregations.earliest && resp.aggregations.latest) { - obj.start.epoch = resp.aggregations.earliest.value; - obj.start.string = resp.aggregations.earliest.value_as_string; - - obj.end.epoch = resp.aggregations.latest.value; - obj.end.string = resp.aggregations.latest.value_as_string; - } - resolve(obj); - }) - .catch(resp => { - reject(resp); - }); + }, }); + + if (resp.aggregations && resp.aggregations.earliest && resp.aggregations.latest) { + obj.start.epoch = resp.aggregations.earliest.value; + obj.start.string = resp.aggregations.earliest.value_as_string; + + obj.end.epoch = resp.aggregations.latest.value; + obj.end.string = resp.aggregations.latest.value_as_string; + } + return obj; } /** @@ -213,6 +243,23 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { return {}; } + const cachedValues = + fieldsAggsCache.getValues( + index, + timeFieldName, + earliestMs, + latestMs, + 'maxBucketCardinality', + fieldNames + ) ?? {}; + + // No need to perform aggregation over the cached fields + const fieldsToAgg = aggregatableFields.filter(field => !cachedValues.hasOwnProperty(field)); + + if (fieldsToAgg.length === 0) { + return cachedValues; + } + const { start, end } = getSafeTimeRange(earliestMs, latestMs, interval); const mustCriteria = [ @@ -238,7 +285,7 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { const getSafeAggName = (field: string) => field.replace(/\W/g, ''); const getMaxBucketAggKey = (field: string) => `max_bucket_${field}`; - const fieldsCardinalityAggs = aggregatableFields.reduce((obj, field) => { + const fieldsCardinalityAggs = fieldsToAgg.reduce((obj, field) => { obj[getSafeAggName(field)] = { cardinality: { field } }; return obj; }, {} as { [field: string]: { cardinality: { field: string } } }); @@ -279,13 +326,18 @@ export function fieldsServiceProvider(callAsCurrentUser: APICaller) { )?.aggregations; if (!aggregations) { - return {}; + return cachedValues; } - return aggregatableFields.reduce((obj, field) => { + const aggResult = fieldsToAgg.reduce((obj, field) => { obj[field] = (aggregations[getMaxBucketAggKey(field)] || { value: 0 }).value ?? 0; return obj; }, {} as { [field: string]: number }); + + return { + ...cachedValues, + ...aggResult, + }; } return { diff --git a/x-pack/plugins/ml/server/routes/modules.ts b/x-pack/plugins/ml/server/routes/modules.ts index 685119672a983b..358cd0ac2871cf 100644 --- a/x-pack/plugins/ml/server/routes/modules.ts +++ b/x-pack/plugins/ml/server/routes/modules.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { schema } from '@kbn/config-schema'; +import { schema, TypeOf } from '@kbn/config-schema'; import { RequestHandlerContext } from 'kibana/server'; import { DatafeedOverride, JobOverride } from '../../common/types/modules'; @@ -36,16 +36,17 @@ function getModule(context: RequestHandlerContext, moduleId: string) { function saveModuleItems( context: RequestHandlerContext, moduleId: string, - prefix: string, - groups: string[], - indexPatternName: string, - query: any, - useDedicatedIndex: boolean, - startDatafeed: boolean, - start: number, - end: number, - jobOverrides: JobOverride[], - datafeedOverrides: DatafeedOverride[] + prefix?: string, + groups?: string[], + indexPatternName?: string, + query?: any, + useDedicatedIndex?: boolean, + startDatafeed?: boolean, + start?: number, + end?: number, + jobOverrides?: JobOverride | JobOverride[], + datafeedOverrides?: DatafeedOverride | DatafeedOverride[], + estimateModelMemory?: boolean ) { const dr = new DataRecognizer( context.ml!.mlClient.callAsCurrentUser, @@ -62,7 +63,8 @@ function saveModuleItems( start, end, jobOverrides, - datafeedOverrides + datafeedOverrides, + estimateModelMemory ); } @@ -156,9 +158,7 @@ export function dataRecognizer({ router, mlLicense }: RouteInitialization) { { path: '/api/ml/modules/setup/{moduleId}', validate: { - params: schema.object({ - ...getModuleIdParamSchema(), - }), + params: schema.object(getModuleIdParamSchema()), body: setupModuleBodySchema, }, }, @@ -177,7 +177,8 @@ export function dataRecognizer({ router, mlLicense }: RouteInitialization) { end, jobOverrides, datafeedOverrides, - } = request.body; + estimateModelMemory, + } = request.body as TypeOf; const result = await saveModuleItems( context, @@ -191,7 +192,8 @@ export function dataRecognizer({ router, mlLicense }: RouteInitialization) { start, end, jobOverrides, - datafeedOverrides + datafeedOverrides, + estimateModelMemory ); return response.ok({ body: result }); @@ -214,9 +216,7 @@ export function dataRecognizer({ router, mlLicense }: RouteInitialization) { { path: '/api/ml/modules/jobs_exist/{moduleId}', validate: { - params: schema.object({ - ...getModuleIdParamSchema(), - }), + params: schema.object(getModuleIdParamSchema()), }, }, mlLicense.fullLicenseAPIGuard(async (context, request, response) => { diff --git a/x-pack/plugins/ml/server/routes/schemas/modules.ts b/x-pack/plugins/ml/server/routes/schemas/modules.ts index 46b7e53c22a053..98e3d80f0ff842 100644 --- a/x-pack/plugins/ml/server/routes/schemas/modules.ts +++ b/x-pack/plugins/ml/server/routes/schemas/modules.ts @@ -17,6 +17,11 @@ export const setupModuleBodySchema = schema.object({ end: schema.maybe(schema.number()), jobOverrides: schema.maybe(schema.any()), datafeedOverrides: schema.maybe(schema.any()), + /** + * Indicates whether an estimate of the model memory limit + * should be made by checking the cardinality of fields in the job configurations. + */ + estimateModelMemory: schema.maybe(schema.boolean()), }); export const getModuleIdParamSchema = (optional = false) => { diff --git a/x-pack/plugins/ml/server/shared_services/providers/modules.ts b/x-pack/plugins/ml/server/shared_services/providers/modules.ts index ffc977917ae46f..ec876273c2c33e 100644 --- a/x-pack/plugins/ml/server/shared_services/providers/modules.ts +++ b/x-pack/plugins/ml/server/shared_services/providers/modules.ts @@ -32,7 +32,8 @@ export interface ModulesProvider { start: number, end: number, jobOverrides: JobOverride[], - datafeedOverrides: DatafeedOverride[] + datafeedOverrides: DatafeedOverride[], + estimateModelMemory?: boolean ): Promise; }; } @@ -65,7 +66,8 @@ export function getModulesProvider(isFullLicense: LicenseCheck): ModulesProvider start: number, end: number, jobOverrides: JobOverride[], - datafeedOverrides: DatafeedOverride[] + datafeedOverrides: DatafeedOverride[], + estimateModelMemory?: boolean ) { const dr = dataRecognizerFactory(callAsCurrentUser, savedObjectsClient); return dr.setupModuleItems( @@ -79,7 +81,8 @@ export function getModulesProvider(isFullLicense: LicenseCheck): ModulesProvider start, end, jobOverrides, - datafeedOverrides + datafeedOverrides, + estimateModelMemory ); }, };