diff --git a/x-pack/plugins/reporting/server/core.ts b/x-pack/plugins/reporting/server/core.ts index 95dc7586ad4a6b..25594e1c0140b7 100644 --- a/x-pack/plugins/reporting/server/core.ts +++ b/x-pack/plugins/reporting/server/core.ts @@ -23,7 +23,6 @@ import { HeadlessChromiumDriverFactory } from './browsers/chromium/driver_factor import { screenshotsObservableFactory } from './lib/screenshots'; import { checkLicense, getExportTypesRegistry } from './lib'; import { ESQueueInstance } from './lib/create_queue'; -import { EnqueueJobFn } from './lib/enqueue_job'; import { ReportingStore } from './lib/store'; export interface ReportingInternalSetup { @@ -36,7 +35,6 @@ export interface ReportingInternalSetup { export interface ReportingInternalStart { browserDriverFactory: HeadlessChromiumDriverFactory; - enqueueJob: EnqueueJobFn; esqueue: ESQueueInstance; store: ReportingStore; savedObjects: SavedObjectsServiceStart; @@ -115,7 +113,7 @@ export class ReportingCore { /* * Gives async access to the startDeps */ - private async getPluginStartDeps() { + public async getPluginStartDeps() { if (this.pluginStartDeps) { return this.pluginStartDeps; } @@ -131,10 +129,6 @@ export class ReportingCore { return (await this.getPluginStartDeps()).esqueue; } - public async getEnqueueJob() { - return (await this.getPluginStartDeps()).enqueueJob; - } - public async getLicenseInfo() { const { licensing } = this.getPluginSetupDeps(); return await licensing.license$ diff --git a/x-pack/plugins/reporting/server/export_types/csv/create_job.ts b/x-pack/plugins/reporting/server/export_types/csv/create_job.ts index 5e8ce923a79e0b..252968e386b53f 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/create_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/create_job.ts @@ -5,10 +5,10 @@ */ import { cryptoFactory } from '../../lib'; -import { ESQueueCreateJobFn, ScheduleTaskFnFactory } from '../../types'; +import { CreateJobFn, ScheduleTaskFnFactory } from '../../types'; import { JobParamsDiscoverCsv } from './types'; -export const scheduleTaskFnFactory: ScheduleTaskFnFactory> = function createJobFactoryFn(reporting) { const config = reporting.getConfig(); diff --git a/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts b/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts index 75070c06824e2d..5eeef0f9906dd4 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/execute_job.test.ts @@ -5,7 +5,7 @@ */ import nodeCrypto from '@elastic/node-crypto'; -import { IUiSettingsClient, ElasticsearchServiceSetup } from 'kibana/server'; +import { ElasticsearchServiceSetup, IUiSettingsClient } from 'kibana/server'; // @ts-ignore import Puid from 'puid'; import sinon from 'sinon'; @@ -20,8 +20,8 @@ import { CSV_BOM_CHARS } from '../../../common/constants'; import { LevelLogger } from '../../lib'; import { setFieldFormats } from '../../services'; import { createMockReportingCore } from '../../test_helpers'; -import { ScheduledTaskParamsCSV } from './types'; import { runTaskFnFactory } from './execute_job'; +import { ScheduledTaskParamsCSV } from './types'; const delay = (ms: number) => new Promise((resolve) => setTimeout(() => resolve(), ms)); @@ -125,7 +125,7 @@ describe('CSV Execute Job', function () { describe('basic Elasticsearch call behavior', function () { it('should decrypt encrypted headers and pass to callAsCurrentUser', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); await runTask( 'job456', getScheduledTaskParams({ @@ -145,7 +145,7 @@ describe('CSV Execute Job', function () { testBody: true, }; - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const job = getScheduledTaskParams({ headers: encryptedHeaders, fields: [], @@ -172,7 +172,7 @@ describe('CSV Execute Job', function () { _scroll_id: scrollId, }); callAsCurrentUserStub.onSecondCall().resolves(defaultElasticsearchResponse); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); await runTask( 'job456', getScheduledTaskParams({ @@ -190,7 +190,7 @@ describe('CSV Execute Job', function () { }); it('should not execute scroll if there are no hits from the search', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); await runTask( 'job456', getScheduledTaskParams({ @@ -224,7 +224,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); await runTask( 'job456', getScheduledTaskParams({ @@ -263,7 +263,7 @@ describe('CSV Execute Job', function () { _scroll_id: lastScrollId, }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); await runTask( 'job456', getScheduledTaskParams({ @@ -295,7 +295,7 @@ describe('CSV Execute Job', function () { _scroll_id: lastScrollId, }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -322,7 +322,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -347,7 +347,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['=SUM(A1:A2)', 'two'], @@ -373,7 +373,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -399,7 +399,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['=SUM(A1:A2)', 'two'], @@ -425,7 +425,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -452,7 +452,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -473,7 +473,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -496,7 +496,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -517,7 +517,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -533,7 +533,7 @@ describe('CSV Execute Job', function () { describe('Elasticsearch call errors', function () { it('should reject Promise if search call errors out', async function () { callAsCurrentUserStub.rejects(new Error()); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: [], @@ -552,7 +552,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); callAsCurrentUserStub.onSecondCall().rejects(new Error()); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: [], @@ -573,7 +573,7 @@ describe('CSV Execute Job', function () { _scroll_id: undefined, }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: [], @@ -592,7 +592,7 @@ describe('CSV Execute Job', function () { _scroll_id: undefined, }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: [], @@ -618,7 +618,7 @@ describe('CSV Execute Job', function () { _scroll_id: undefined, }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: [], @@ -644,7 +644,7 @@ describe('CSV Execute Job', function () { _scroll_id: undefined, }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: [], @@ -678,7 +678,7 @@ describe('CSV Execute Job', function () { }); it('should stop calling Elasticsearch when cancellationToken.cancel is called', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); runTask( 'job345', getScheduledTaskParams({ @@ -697,7 +697,7 @@ describe('CSV Execute Job', function () { }); it(`shouldn't call clearScroll if it never got a scrollId`, async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); runTask( 'job345', getScheduledTaskParams({ @@ -715,7 +715,7 @@ describe('CSV Execute Job', function () { }); it('should call clearScroll if it got a scrollId', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); runTask( 'job345', getScheduledTaskParams({ @@ -737,7 +737,7 @@ describe('CSV Execute Job', function () { describe('csv content', function () { it('should write column headers to output, even if there are no results', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -749,7 +749,7 @@ describe('CSV Execute Job', function () { it('should use custom uiSettings csv:separator for header', async function () { mockUiSettingsClient.get.withArgs(CSV_SEPARATOR_SETTING).returns(';'); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -761,7 +761,7 @@ describe('CSV Execute Job', function () { it('should escape column headers if uiSettings csv:quoteValues is true', async function () { mockUiSettingsClient.get.withArgs(CSV_QUOTE_VALUES_SETTING).returns(true); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one and a half', 'two', 'three-and-four', 'five & six'], @@ -773,7 +773,7 @@ describe('CSV Execute Job', function () { it(`shouldn't escape column headers if uiSettings csv:quoteValues is false`, async function () { mockUiSettingsClient.get.withArgs(CSV_QUOTE_VALUES_SETTING).returns(false); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one and a half', 'two', 'three-and-four', 'five & six'], @@ -784,7 +784,7 @@ describe('CSV Execute Job', function () { }); it('should write column headers to output, when there are results', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); callAsCurrentUserStub.onFirstCall().resolves({ hits: { hits: [{ one: '1', two: '2' }], @@ -798,13 +798,14 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); const { content } = await runTask('job123', jobParams, cancellationToken); - const lines = content.split('\n'); + expect(content).not.toBe(null); + const lines = content!.split('\n'); const headerLine = lines[0]; expect(headerLine).toBe('one,two'); }); it('should use comma separated values of non-nested fields from _source', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); callAsCurrentUserStub.onFirstCall().resolves({ hits: { hits: [{ _source: { one: 'foo', two: 'bar' } }], @@ -819,13 +820,14 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); const { content } = await runTask('job123', jobParams, cancellationToken); - const lines = content.split('\n'); + expect(content).not.toBe(null); + const lines = content!.split('\n'); const valuesLine = lines[1]; expect(valuesLine).toBe('foo,bar'); }); it('should concatenate the hits from multiple responses', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); callAsCurrentUserStub.onFirstCall().resolves({ hits: { hits: [{ _source: { one: 'foo', two: 'bar' } }], @@ -846,14 +848,15 @@ describe('CSV Execute Job', function () { searchRequest: { index: null, body: null }, }); const { content } = await runTask('job123', jobParams, cancellationToken); - const lines = content.split('\n'); + expect(content).not.toBe(null); + const lines = content!.split('\n'); expect(lines[1]).toBe('foo,bar'); expect(lines[2]).toBe('baz,qux'); }); it('should use field formatters to format fields', async function () { - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); callAsCurrentUserStub.onFirstCall().resolves({ hits: { hits: [{ _source: { one: 'foo', two: 'bar' } }], @@ -877,7 +880,8 @@ describe('CSV Execute Job', function () { }, }); const { content } = await runTask('job123', jobParams, cancellationToken); - const lines = content.split('\n'); + expect(content).not.toBe(null); + const lines = content!.split('\n'); expect(lines[1]).toBe('FOO,bar'); }); @@ -889,13 +893,13 @@ describe('CSV Execute Job', function () { // tests use these 'simple' characters to make the math easier describe('when only the headers exceed the maxSizeBytes', function () { - let content: string; - let maxSizeReached: boolean; + let content: string | null; + let maxSizeReached: boolean | undefined; beforeEach(async function () { configGetStub.withArgs('csv', 'maxSizeBytes').returns(1); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -919,13 +923,13 @@ describe('CSV Execute Job', function () { }); describe('when headers are equal to maxSizeBytes', function () { - let content: string; - let maxSizeReached: boolean; + let content: string | null; + let maxSizeReached: boolean | undefined; beforeEach(async function () { configGetStub.withArgs('csv', 'maxSizeBytes').returns(9); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -949,8 +953,8 @@ describe('CSV Execute Job', function () { }); describe('when the data exceeds the maxSizeBytes', function () { - let content: string; - let maxSizeReached: boolean; + let content: string | null; + let maxSizeReached: boolean | undefined; beforeEach(async function () { configGetStub.withArgs('csv', 'maxSizeBytes').returns(9); @@ -962,7 +966,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -987,8 +991,8 @@ describe('CSV Execute Job', function () { }); describe('when headers and data equal the maxSizeBytes', function () { - let content: string; - let maxSizeReached: boolean; + let content: string | null; + let maxSizeReached: boolean | undefined; beforeEach(async function () { mockReportingCore.getUiSettingsServiceFactory = () => @@ -1002,7 +1006,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -1039,7 +1043,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -1065,7 +1069,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], @@ -1091,7 +1095,7 @@ describe('CSV Execute Job', function () { _scroll_id: 'scrollId', }); - const runTask = await runTaskFnFactory(mockReportingCore, mockLogger); + const runTask = runTaskFnFactory(mockReportingCore, mockLogger); const jobParams = getScheduledTaskParams({ headers: encryptedHeaders, fields: ['one', 'two'], diff --git a/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts index f0c41a6a49703d..802f4a81777c5e 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/execute_job.ts @@ -10,7 +10,7 @@ import Hapi from 'hapi'; import { KibanaRequest } from '../../../../../../src/core/server'; import { CONTENT_TYPE_CSV, CSV_JOB_TYPE } from '../../../common/constants'; import { cryptoFactory, LevelLogger } from '../../lib'; -import { ESQueueWorkerExecuteFn, RunTaskFnFactory } from '../../types'; +import { WorkerExecuteFn, RunTaskFnFactory } from '../../types'; import { ScheduledTaskParamsCSV } from './types'; import { createGenerateCsv } from './generate_csv'; @@ -54,7 +54,7 @@ const getRequest = async (headers: string | undefined, crypto: Crypto, logger: L } as Hapi.Request); }; -export const runTaskFnFactory: RunTaskFnFactory> = function executeJobFactoryFn(reporting, parentLogger) { const config = reporting.getConfig(); diff --git a/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts b/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts index 8da27100ac31cf..06aa2434afc3f4 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/generate_csv/index.ts @@ -113,6 +113,10 @@ export function createGenerateCsv(logger: LevelLogger) { break; } + if (cancellationToken.isCancelled()) { + break; + } + const flattened = flattenHit(hit); const rows = formatCsvValues(flattened); const rowsHaveFormulas = diff --git a/x-pack/plugins/reporting/server/export_types/csv/index.ts b/x-pack/plugins/reporting/server/export_types/csv/index.ts index dffc874831dc23..4bca42e0661e5c 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/index.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/index.ts @@ -13,17 +13,17 @@ import { LICENSE_TYPE_TRIAL, } from '../../../common/constants'; import { CSV_JOB_TYPE as jobType } from '../../../constants'; -import { ESQueueCreateJobFn, ESQueueWorkerExecuteFn, ExportTypeDefinition } from '../../types'; -import { metadata } from './metadata'; +import { CreateJobFn, WorkerExecuteFn, ExportTypeDefinition } from '../../types'; import { scheduleTaskFnFactory } from './create_job'; import { runTaskFnFactory } from './execute_job'; +import { metadata } from './metadata'; import { JobParamsDiscoverCsv, ScheduledTaskParamsCSV } from './types'; export const getExportType = (): ExportTypeDefinition< JobParamsDiscoverCsv, - ESQueueCreateJobFn, + CreateJobFn, ScheduledTaskParamsCSV, - ESQueueWorkerExecuteFn + WorkerExecuteFn > => ({ ...metadata, jobType, diff --git a/x-pack/plugins/reporting/server/export_types/csv/types.d.ts b/x-pack/plugins/reporting/server/export_types/csv/types.d.ts index 9e86a5bb254a31..e0d09d04a3d3a4 100644 --- a/x-pack/plugins/reporting/server/export_types/csv/types.d.ts +++ b/x-pack/plugins/reporting/server/export_types/csv/types.d.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { ScheduledTaskParams } from '../../types'; +import { CreateJobBaseParams, ScheduledTaskParams } from '../../types'; export type RawValue = string | object | null | undefined; @@ -28,10 +28,8 @@ export interface IndexPatternSavedObject { }; } -export interface JobParamsDiscoverCsv { - browserTimezone: string; +export interface JobParamsDiscoverCsv extends CreateJobBaseParams { indexPatternId: string; - objectType: string; title: string; searchRequest: SearchRequest; fields: string[]; diff --git a/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts b/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts index 0cc9ec16ed71bc..ec7e0a21f0498a 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_from_savedobject/execute_job.ts @@ -41,7 +41,7 @@ export const runTaskFnFactory: RunTaskFnFactory = function e // jobID is only for "queued" jobs // Use the jobID as a logging tag or "immediate" const { jobParams } = jobPayload; - const jobLogger = logger.clone([jobId === null ? 'immediate' : jobId]); + const jobLogger = logger.clone(['immediate']); const generateCsv = createGenerateCsv(jobLogger); const { panel, visType } = jobParams as JobParamsPanelCsv & { panel: SearchPanel }; diff --git a/x-pack/plugins/reporting/server/export_types/png/create_job/index.ts b/x-pack/plugins/reporting/server/export_types/png/create_job/index.ts index 9227354520b6eb..2252177e980850 100644 --- a/x-pack/plugins/reporting/server/export_types/png/create_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/png/create_job/index.ts @@ -5,11 +5,11 @@ */ import { cryptoFactory } from '../../../lib'; -import { ESQueueCreateJobFn, ScheduleTaskFnFactory } from '../../../types'; +import { CreateJobFn, ScheduleTaskFnFactory } from '../../../types'; import { validateUrls } from '../../common'; import { JobParamsPNG } from '../types'; -export const scheduleTaskFnFactory: ScheduleTaskFnFactory> = function createJobFactoryFn(reporting) { const config = reporting.getConfig(); diff --git a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts index 9c7134736f4f61..35cd4139df413f 100644 --- a/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/png/execute_job/index.ts @@ -8,7 +8,7 @@ import apm from 'elastic-apm-node'; import * as Rx from 'rxjs'; import { catchError, map, mergeMap, takeUntil } from 'rxjs/operators'; import { PNG_JOB_TYPE } from '../../../../common/constants'; -import { ESQueueWorkerExecuteFn, RunTaskFnFactory, TaskRunResult } from '../../..//types'; +import { WorkerExecuteFn, RunTaskFnFactory, TaskRunResult } from '../../..//types'; import { decryptJobHeaders, getConditionalHeaders, @@ -18,7 +18,7 @@ import { import { generatePngObservableFactory } from '../lib/generate_png'; import { ScheduledTaskParamsPNG } from '../types'; -type QueuedPngExecutorFactory = RunTaskFnFactory>; +type QueuedPngExecutorFactory = RunTaskFnFactory>; export const runTaskFnFactory: QueuedPngExecutorFactory = function executeJobFactoryFn( reporting, diff --git a/x-pack/plugins/reporting/server/export_types/png/index.ts b/x-pack/plugins/reporting/server/export_types/png/index.ts index 25b4dbd60535b4..c966dedb6b076d 100644 --- a/x-pack/plugins/reporting/server/export_types/png/index.ts +++ b/x-pack/plugins/reporting/server/export_types/png/index.ts @@ -12,17 +12,17 @@ import { LICENSE_TYPE_TRIAL, PNG_JOB_TYPE as jobType, } from '../../../common/constants'; -import { ESQueueCreateJobFn, ESQueueWorkerExecuteFn, ExportTypeDefinition } from '../../types'; -import { metadata } from './metadata'; +import { CreateJobFn, WorkerExecuteFn, ExportTypeDefinition } from '../../types'; import { scheduleTaskFnFactory } from './create_job'; import { runTaskFnFactory } from './execute_job'; +import { metadata } from './metadata'; import { JobParamsPNG, ScheduledTaskParamsPNG } from './types'; export const getExportType = (): ExportTypeDefinition< JobParamsPNG, - ESQueueCreateJobFn, + CreateJobFn, ScheduledTaskParamsPNG, - ESQueueWorkerExecuteFn + WorkerExecuteFn > => ({ ...metadata, jobType, diff --git a/x-pack/plugins/reporting/server/export_types/png/types.d.ts b/x-pack/plugins/reporting/server/export_types/png/types.d.ts index 4c40f55f0f0d69..1ddee8419df309 100644 --- a/x-pack/plugins/reporting/server/export_types/png/types.d.ts +++ b/x-pack/plugins/reporting/server/export_types/png/types.d.ts @@ -4,16 +4,13 @@ * you may not use this file except in compliance with the Elastic License. */ -import { ScheduledTaskParams } from '../../../server/types'; +import { CreateJobBaseParams, ScheduledTaskParams } from '../../../server/types'; import { LayoutInstance, LayoutParams } from '../../lib/layouts'; // Job params: structure of incoming user request data -export interface JobParamsPNG { - objectType: string; +export interface JobParamsPNG extends CreateJobBaseParams { title: string; relativeUrl: string; - browserTimezone: string; - layout: LayoutInstance; } // Job payload: structure of stored job data provided by create_job diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/create_job/index.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/create_job/index.ts index 4540983129ebcf..5de089a13bfa44 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/create_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/create_job/index.ts @@ -4,12 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ -import { validateUrls } from '../../common'; import { cryptoFactory } from '../../../lib'; -import { ESQueueCreateJobFn, ScheduleTaskFnFactory } from '../../../types'; +import { CreateJobFn, ScheduleTaskFnFactory } from '../../../types'; +import { validateUrls } from '../../common'; import { JobParamsPDF } from '../types'; -export const scheduleTaskFnFactory: ScheduleTaskFnFactory> = function createJobFactoryFn(reporting) { const config = reporting.getConfig(); diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts index eb15c0a71ca3f2..5ace1c987adb5a 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/execute_job/index.ts @@ -8,7 +8,7 @@ import apm from 'elastic-apm-node'; import * as Rx from 'rxjs'; import { catchError, map, mergeMap, takeUntil } from 'rxjs/operators'; import { PDF_JOB_TYPE } from '../../../../common/constants'; -import { ESQueueWorkerExecuteFn, RunTaskFnFactory, TaskRunResult } from '../../../types'; +import { WorkerExecuteFn, RunTaskFnFactory, TaskRunResult } from '../../../types'; import { decryptJobHeaders, getConditionalHeaders, @@ -19,7 +19,7 @@ import { import { generatePdfObservableFactory } from '../lib/generate_pdf'; import { ScheduledTaskParamsPDF } from '../types'; -type QueuedPdfExecutorFactory = RunTaskFnFactory>; +type QueuedPdfExecutorFactory = RunTaskFnFactory>; export const runTaskFnFactory: QueuedPdfExecutorFactory = function executeJobFactoryFn( reporting, diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/index.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/index.ts index e5115c243c6972..7f21d36c4b72c4 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/index.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/index.ts @@ -12,17 +12,17 @@ import { LICENSE_TYPE_TRIAL, PDF_JOB_TYPE as jobType, } from '../../../common/constants'; -import { ESQueueCreateJobFn, ESQueueWorkerExecuteFn, ExportTypeDefinition } from '../../types'; -import { metadata } from './metadata'; +import { CreateJobFn, WorkerExecuteFn, ExportTypeDefinition } from '../../types'; import { scheduleTaskFnFactory } from './create_job'; import { runTaskFnFactory } from './execute_job'; +import { metadata } from './metadata'; import { JobParamsPDF, ScheduledTaskParamsPDF } from './types'; export const getExportType = (): ExportTypeDefinition< JobParamsPDF, - ESQueueCreateJobFn, + CreateJobFn, ScheduledTaskParamsPDF, - ESQueueWorkerExecuteFn + WorkerExecuteFn > => ({ ...metadata, jobType, diff --git a/x-pack/plugins/reporting/server/export_types/printable_pdf/types.d.ts b/x-pack/plugins/reporting/server/export_types/printable_pdf/types.d.ts index cba0f41f075367..7830f87780c2eb 100644 --- a/x-pack/plugins/reporting/server/export_types/printable_pdf/types.d.ts +++ b/x-pack/plugins/reporting/server/export_types/printable_pdf/types.d.ts @@ -4,15 +4,13 @@ * you may not use this file except in compliance with the Elastic License. */ -import { ScheduledTaskParams } from '../../../server/types'; +import { CreateJobBaseParams, ScheduledTaskParams } from '../../../server/types'; import { LayoutInstance, LayoutParams } from '../../lib/layouts'; // Job params: structure of incoming user request data, after being parsed from RISON -export interface JobParamsPDF { - objectType: string; // visualization, dashboard, etc. Used for job info & telemetry +export interface JobParamsPDF extends CreateJobBaseParams { title: string; relativeUrls: string[]; - browserTimezone: string; layout: LayoutInstance; } diff --git a/x-pack/plugins/reporting/server/lib/create_worker.ts b/x-pack/plugins/reporting/server/lib/create_worker.ts index 837be1f44a0930..5b0f1ddb2f1579 100644 --- a/x-pack/plugins/reporting/server/lib/create_worker.ts +++ b/x-pack/plugins/reporting/server/lib/create_worker.ts @@ -8,7 +8,7 @@ import { CancellationToken } from '../../common'; import { PLUGIN_ID } from '../../common/constants'; import { ReportingCore } from '../../server'; import { LevelLogger } from '../../server/lib'; -import { ESQueueWorkerExecuteFn, ExportTypeDefinition, JobSource } from '../../server/types'; +import { ExportTypeDefinition, JobSource, WorkerExecuteFn } from '../../server/types'; import { ESQueueInstance } from './create_queue'; // @ts-ignore untyped dependency import { events as esqueueEvents } from './esqueue'; @@ -22,10 +22,10 @@ export function createWorkerFactory(reporting: ReportingCore, log // Once more document types are added, this will need to be passed in return async function createWorker(queue: ESQueueInstance) { // export type / execute job map - const jobExecutors: Map> = new Map(); + const jobExecutors: Map> = new Map(); for (const exportType of reporting.getExportTypesRegistry().getAll() as Array< - ExportTypeDefinition> + ExportTypeDefinition> >) { const jobExecutor = exportType.runTaskFnFactory(reporting, logger); jobExecutors.set(exportType.jobType, jobExecutor); diff --git a/x-pack/plugins/reporting/server/lib/enqueue_job.ts b/x-pack/plugins/reporting/server/lib/enqueue_job.ts index d1554a03b9389a..31960c782b7b9c 100644 --- a/x-pack/plugins/reporting/server/lib/enqueue_job.ts +++ b/x-pack/plugins/reporting/server/lib/enqueue_job.ts @@ -5,15 +5,15 @@ */ import { KibanaRequest, RequestHandlerContext } from 'src/core/server'; +import { ReportingCore } from '../'; import { AuthenticatedUser } from '../../../security/server'; -import { ESQueueCreateJobFn } from '../../server/types'; -import { ReportingCore } from '../core'; +import { CreateJobBaseParams, CreateJobFn } from '../types'; import { LevelLogger } from './'; -import { ReportingStore, Report } from './store'; +import { Report } from './store'; export type EnqueueJobFn = ( exportTypeId: string, - jobParams: unknown, + jobParams: CreateJobBaseParams, user: AuthenticatedUser | null, context: RequestHandlerContext, request: KibanaRequest @@ -21,41 +21,39 @@ export type EnqueueJobFn = ( export function enqueueJobFactory( reporting: ReportingCore, - store: ReportingStore, parentLogger: LevelLogger ): EnqueueJobFn { - const config = reporting.getConfig(); - const queueTimeout = config.get('queue', 'timeout'); - const browserType = config.get('capture', 'browser', 'type'); - const maxAttempts = config.get('capture', 'maxAttempts'); const logger = parentLogger.clone(['queue-job']); return async function enqueueJob( exportTypeId: string, - jobParams: unknown, + jobParams: CreateJobBaseParams, user: AuthenticatedUser | null, context: RequestHandlerContext, request: KibanaRequest ) { - type ScheduleTaskFnType = ESQueueCreateJobFn; + type ScheduleTaskFnType = CreateJobFn; - const username = user ? user.username : false; + const username: string | null = user ? user.username : null; const exportType = reporting.getExportTypesRegistry().getById(exportTypeId); if (exportType == null) { throw new Error(`Export type ${exportTypeId} does not exist in the registry!`); } - const scheduleTask = exportType.scheduleTaskFnFactory(reporting, logger) as ScheduleTaskFnType; + const [scheduleTask, { store }] = await Promise.all([ + exportType.scheduleTaskFnFactory(reporting, logger) as ScheduleTaskFnType, + reporting.getPluginStartDeps(), + ]); + + // add encrytped headers const payload = await scheduleTask(jobParams, context, request); - const options = { - timeout: queueTimeout, - created_by: username, - browser_type: browserType, - max_attempts: maxAttempts, - }; + // store the pending report, puts it in the Reporting Management UI table + const report = await store.addReport(exportType.jobType, username, payload); + + logger.info(`Scheduled ${exportType.name} report: ${report._id}`); - return await store.addReport(exportType.jobType, payload, options); + return report; }; } diff --git a/x-pack/plugins/reporting/server/lib/esqueue/constants/index.js b/x-pack/plugins/reporting/server/lib/esqueue/constants/index.js index 5fcff3531851a6..7f7383bb8611d6 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/constants/index.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/constants/index.js @@ -4,9 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ -import { events } from './events'; -import { statuses } from './statuses'; +import { statuses } from '../../statuses'; import { defaultSettings } from './default_settings'; +import { events } from './events'; export const constants = { ...events, diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index 469bafd6946122..0c3a6384f6b9ae 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -158,26 +158,18 @@ export class Worker extends events.EventEmitter { kibana_name: this.kibanaName, }; - return this.queue.store - .updateReport({ - index: job._index, - id: job._id, - if_seq_no: job._seq_no, - if_primary_term: job._primary_term, - body: { doc }, - }) - .then((response) => { - this.info(`Job marked as claimed: ${getUpdatedDocPath(response)}`); - const updatedJob = { - ...job, - ...response, - }; - updatedJob._source = { - ...job._source, - ...doc, - }; - return updatedJob; - }); + return this.queue.store.setReportClaimed(job, doc).then((response) => { + this.info(`Job marked as claimed: ${getUpdatedDocPath(response)}`); + const updatedJob = { + ...job, + ...response, + }; + updatedJob._source = { + ...job._source, + ...doc, + }; + return updatedJob; + }); } _failJob(job, output = false) { @@ -198,13 +190,7 @@ export class Worker extends events.EventEmitter { }); return this.queue.store - .updateReport({ - index: job._index, - id: job._id, - if_seq_no: job._seq_no, - if_primary_term: job._primary_term, - body: { doc }, - }) + .setReportFailed(job, doc) .then((response) => { this.info(`Job marked as failed: ${getUpdatedDocPath(response)}`); }) @@ -295,13 +281,7 @@ export class Worker extends events.EventEmitter { }; return this.queue.store - .updateReport({ - index: job._index, - id: job._id, - if_seq_no: job._seq_no, - if_primary_term: job._primary_term, - body: { doc }, - }) + .setReportCompleted(job, doc) .then((response) => { const eventOutput = { job: formatJobObject(job), diff --git a/x-pack/plugins/reporting/server/lib/index.ts b/x-pack/plugins/reporting/server/lib/index.ts index e4adb1188e3fc4..f3a09cffbb1047 100644 --- a/x-pack/plugins/reporting/server/lib/index.ts +++ b/x-pack/plugins/reporting/server/lib/index.ts @@ -8,8 +8,9 @@ export { checkLicense } from './check_license'; export { createQueueFactory } from './create_queue'; export { cryptoFactory } from './crypto'; export { enqueueJobFactory } from './enqueue_job'; -export { getExportTypesRegistry } from './export_types_registry'; +export { ExportTypesRegistry, getExportTypesRegistry } from './export_types_registry'; export { LevelLogger } from './level_logger'; +export { statuses } from './statuses'; export { ReportingStore } from './store'; export { startTrace } from './trace'; export { runValidations } from './validate'; diff --git a/x-pack/plugins/reporting/server/lib/esqueue/constants/statuses.ts b/x-pack/plugins/reporting/server/lib/statuses.ts similarity index 100% rename from x-pack/plugins/reporting/server/lib/esqueue/constants/statuses.ts rename to x-pack/plugins/reporting/server/lib/statuses.ts diff --git a/x-pack/plugins/reporting/server/lib/store/mapping.ts b/x-pack/plugins/reporting/server/lib/store/mapping.ts index a819923e2f1054..d08b928cdca4be 100644 --- a/x-pack/plugins/reporting/server/lib/store/mapping.ts +++ b/x-pack/plugins/reporting/server/lib/store/mapping.ts @@ -45,7 +45,7 @@ export const mapping = { priority: { type: 'byte' }, timeout: { type: 'long' }, process_expiration: { type: 'date' }, - created_by: { type: 'keyword' }, + created_by: { type: 'keyword' }, // `null` if security is disabled created_at: { type: 'date' }, started_at: { type: 'date' }, completed_at: { type: 'date' }, diff --git a/x-pack/plugins/reporting/server/lib/store/report.test.ts b/x-pack/plugins/reporting/server/lib/store/report.test.ts index 83444494e61d33..9ac5d1f87c387f 100644 --- a/x-pack/plugins/reporting/server/lib/store/report.test.ts +++ b/x-pack/plugins/reporting/server/lib/store/report.test.ts @@ -8,46 +8,59 @@ import { Report } from './report'; describe('Class Report', () => { it('constructs Report instance', () => { - const opts = { - index: '.reporting-test-index-12345', + const report = new Report({ + _index: '.reporting-test-index-12345', jobtype: 'test-report', created_by: 'created_by_test_string', browser_type: 'browser_type_test_string', max_attempts: 50, - payload: { payload_test_field: 1 }, + payload: { headers: 'payload_test_field', objectType: 'testOt' }, timeout: 30000, priority: 1, - }; - const report = new Report(opts); - expect(report.toJSON()).toMatchObject({ - _primary_term: undefined, - _seq_no: undefined, + }); + + expect(report.toEsDocsJSON()).toMatchObject({ + _index: '.reporting-test-index-12345', + _source: { + attempts: 0, + browser_type: 'browser_type_test_string', + completed_at: undefined, + created_at: undefined, + created_by: 'created_by_test_string', + jobtype: 'test-report', + max_attempts: 50, + meta: undefined, + payload: { headers: 'payload_test_field', objectType: 'testOt' }, + priority: 1, + started_at: undefined, + status: 'pending', + timeout: 30000, + }, + }); + expect(report.toApiJSON()).toMatchObject({ browser_type: 'browser_type_test_string', created_by: 'created_by_test_string', jobtype: 'test-report', max_attempts: 50, - payload: { - payload_test_field: 1, - }, + payload: { headers: 'payload_test_field', objectType: 'testOt' }, priority: 1, timeout: 30000, }); - expect(report.id).toBeDefined(); + expect(report._id).toBeDefined(); }); - it('updateWithDoc method syncs takes fields to sync ES metadata', () => { - const opts = { - index: '.reporting-test-index-12345', + it('updateWithEsDoc method syncs fields to sync ES metadata', () => { + const report = new Report({ + _index: '.reporting-test-index-12345', jobtype: 'test-report', created_by: 'created_by_test_string', browser_type: 'browser_type_test_string', max_attempts: 50, - payload: { payload_test_field: 1 }, + payload: { headers: 'payload_test_field', objectType: 'testOt' }, timeout: 30000, priority: 1, - }; - const report = new Report(opts); + }); const metadata = { _index: '.reporting-test-update', @@ -55,23 +68,53 @@ describe('Class Report', () => { _primary_term: 77, _seq_no: 99, }; - report.updateWithDoc(metadata); - - expect(report.toJSON()).toMatchObject({ - index: '.reporting-test-update', - _primary_term: 77, - _seq_no: 99, - browser_type: 'browser_type_test_string', - created_by: 'created_by_test_string', - jobtype: 'test-report', - max_attempts: 50, - payload: { - payload_test_field: 1, - }, - priority: 1, - timeout: 30000, - }); + report.updateWithEsDoc(metadata); - expect(report._id).toBe('12342p9o387549o2345'); + expect(report.toEsDocsJSON()).toMatchInlineSnapshot(` + Object { + "_id": "12342p9o387549o2345", + "_index": ".reporting-test-update", + "_source": Object { + "attempts": 0, + "browser_type": "browser_type_test_string", + "completed_at": undefined, + "created_at": undefined, + "created_by": "created_by_test_string", + "jobtype": "test-report", + "max_attempts": 50, + "meta": undefined, + "payload": Object { + "headers": "payload_test_field", + "objectType": "testOt", + }, + "priority": 1, + "started_at": undefined, + "status": "pending", + "timeout": 30000, + }, + } + `); + expect(report.toApiJSON()).toMatchInlineSnapshot(` + Object { + "attempts": 0, + "browser_type": "browser_type_test_string", + "completed_at": undefined, + "created_at": undefined, + "created_by": "created_by_test_string", + "id": "12342p9o387549o2345", + "index": ".reporting-test-update", + "jobtype": "test-report", + "max_attempts": 50, + "meta": undefined, + "payload": Object { + "headers": "payload_test_field", + "objectType": "testOt", + }, + "priority": 1, + "started_at": undefined, + "status": "pending", + "timeout": 30000, + } + `); }); }); diff --git a/x-pack/plugins/reporting/server/lib/store/report.ts b/x-pack/plugins/reporting/server/lib/store/report.ts index cc9967e64b6ebc..5ff71ae7a71824 100644 --- a/x-pack/plugins/reporting/server/lib/store/report.ts +++ b/x-pack/plugins/reporting/server/lib/store/report.ts @@ -6,80 +6,158 @@ // @ts-ignore no module definition import Puid from 'puid'; +import { JobStatuses } from '../../../constants'; +import { LayoutInstance } from '../layouts'; -interface Payload { - id?: string; - index: string; +/* + * The document created by Reporting to store in the .reporting index + */ +interface ReportingDocument { + _id: string; + _index: string; + _seq_no: unknown; + _primary_term: unknown; jobtype: string; - created_by: string | boolean; - payload: unknown; + created_by: string | null; + payload: { + headers: string; // encrypted headers + objectType: string; + layout?: LayoutInstance; + }; + meta: unknown; browser_type: string; - priority: number; max_attempts: number; timeout: number; + + status: string; + attempts: number; + output?: unknown; + started_at?: string; + completed_at?: string; + created_at?: string; + priority?: number; + process_expiration?: string; } +/* + * The document created by Reporting to store as task parameters for Task + * Manager to reference the report in .reporting + */ const puid = new Puid(); -export class Report { - public readonly jobtype: string; - public readonly created_by: string | boolean; - public readonly payload: unknown; - public readonly browser_type: string; - public readonly id: string; +export class Report implements Partial { + public _index?: string; + public _id: string; + public _primary_term?: unknown; // set by ES + public _seq_no: unknown; // set by ES - public readonly priority: number; - // queue stuff, to be removed with Task Manager integration + public readonly jobtype: string; + public readonly created_at?: string; + public readonly created_by?: string | null; + public readonly payload: { + headers: string; // encrypted headers + objectType: string; + layout?: LayoutInstance; + }; + public readonly meta: unknown; public readonly max_attempts: number; - public readonly timeout: number; + public readonly browser_type?: string; - public _index: string; - public _id?: string; // set by ES - public _primary_term?: unknown; // set by ES - public _seq_no: unknown; // set by ES + public readonly status: string; + public readonly attempts: number; + public readonly output?: unknown; + public readonly started_at?: string; + public readonly completed_at?: string; + public readonly process_expiration?: string; + public readonly priority?: number; + public readonly timeout?: number; /* * Create an unsaved report */ - constructor(opts: Payload) { - this.jobtype = opts.jobtype; + constructor(opts: Partial) { + this._id = opts._id != null ? opts._id : puid.generate(); + this._index = opts._index; + this._primary_term = opts._primary_term; + this._seq_no = opts._seq_no; + + this.payload = opts.payload!; + this.jobtype = opts.jobtype!; + this.max_attempts = opts.max_attempts!; + this.attempts = opts.attempts || 0; + + this.process_expiration = opts.process_expiration; + this.timeout = opts.timeout; + + this.created_at = opts.created_at; this.created_by = opts.created_by; - this.payload = opts.payload; + this.meta = opts.meta; this.browser_type = opts.browser_type; this.priority = opts.priority; - this.max_attempts = opts.max_attempts; - this.timeout = opts.timeout; - this.id = puid.generate(); - this._index = opts.index; + this.status = opts.status || JobStatuses.PENDING; + this.output = opts.output || null; } /* * Update the report with "live" storage metadata */ - updateWithDoc(doc: Partial) { - if (doc._index) { - this._index = doc._index; // can not be undefined + updateWithEsDoc(doc: Partial) { + if (doc._index == null || doc._id == null) { + throw new Error(`Report object from ES has missing fields!`); } this._id = doc._id; + this._index = doc._index; this._primary_term = doc._primary_term; this._seq_no = doc._seq_no; } - toJSON() { + /* + * Data structure for writing to Elasticsearch index + */ + toEsDocsJSON() { + return { + _id: this._id, + _index: this._index, + _source: { + jobtype: this.jobtype, + created_at: this.created_at, + created_by: this.created_by, + payload: this.payload, + meta: this.meta, + timeout: this.timeout, + max_attempts: this.max_attempts, + priority: this.priority, + browser_type: this.browser_type, + status: this.status, + attempts: this.attempts, + started_at: this.started_at, + completed_at: this.completed_at, + }, + }; + } + + /* + * Data structure for API responses + */ + toApiJSON() { return { - id: this.id, + id: this._id, index: this._index, - _seq_no: this._seq_no, - _primary_term: this._primary_term, jobtype: this.jobtype, + created_at: this.created_at, created_by: this.created_by, payload: this.payload, + meta: this.meta, timeout: this.timeout, max_attempts: this.max_attempts, priority: this.priority, browser_type: this.browser_type, + status: this.status, + attempts: this.attempts, + started_at: this.started_at, + completed_at: this.completed_at, }; } } diff --git a/x-pack/plugins/reporting/server/lib/store/store.test.ts b/x-pack/plugins/reporting/server/lib/store/store.test.ts index 4868a1dfdd8f3a..c66e2dd7742c4f 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.test.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.test.ts @@ -5,11 +5,12 @@ */ import sinon from 'sinon'; +import { ElasticsearchServiceSetup } from 'src/core/server'; import { ReportingConfig, ReportingCore } from '../..'; import { createMockReportingCore } from '../../test_helpers'; import { createMockLevelLogger } from '../../test_helpers/create_mock_levellogger'; +import { Report } from './report'; import { ReportingStore } from './store'; -import { ElasticsearchServiceSetup } from 'src/core/server'; const getMockConfig = (mockConfigGet: sinon.SinonStub) => ({ get: mockConfigGet, @@ -31,11 +32,13 @@ describe('ReportingStore', () => { mockConfig = getMockConfig(mockConfigGet); mockCore = await createMockReportingCore(mockConfig); + callClusterStub.reset(); callClusterStub.withArgs('indices.exists').resolves({}); callClusterStub.withArgs('indices.create').resolves({}); - callClusterStub.withArgs('index').resolves({}); + callClusterStub.withArgs('index').resolves({ _id: 'stub-id', _index: 'stub-index' }); callClusterStub.withArgs('indices.refresh').resolves({}); callClusterStub.withArgs('update').resolves({}); + callClusterStub.withArgs('get').resolves({}); mockCore.getElasticsearchService = () => (mockElasticsearch as unknown) as ElasticsearchServiceSetup; @@ -45,25 +48,25 @@ describe('ReportingStore', () => { it('returns Report object', async () => { const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; - const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, + const reportPayload = { + browserTimezone: 'UTC', + headers: 'rp_headers_1', + objectType: 'testOt', }; - await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).resolves.toMatchObject({ + await expect(store.addReport(reportType, 'username1', reportPayload)).resolves.toMatchObject({ _primary_term: undefined, _seq_no: undefined, - browser_type: 'browser_type_string', - created_by: 'created_by_string', + attempts: 0, + browser_type: undefined, + completed_at: undefined, + created_by: 'username1', jobtype: 'unknowntype', - max_attempts: 1, + max_attempts: undefined, payload: {}, priority: 10, - timeout: 10000, + started_at: undefined, + status: 'pending', + timeout: undefined, }); }); @@ -76,35 +79,31 @@ describe('ReportingStore', () => { const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; - const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, + const reportPayload = { + browserTimezone: 'UTC', + headers: 'rp_headers_2', + objectType: 'testOt', }; - expect( - store.addReport(reportType, reportPayload, reportOptions) - ).rejects.toMatchInlineSnapshot(`[Error: Invalid index interval: centurially]`); + expect(store.addReport(reportType, 'user1', reportPayload)).rejects.toMatchInlineSnapshot( + `[Error: Invalid index interval: centurially]` + ); }); it('handles error creating the index', async () => { // setup callClusterStub.withArgs('indices.exists').resolves(false); - callClusterStub.withArgs('indices.create').rejects(new Error('error')); + callClusterStub.withArgs('indices.create').rejects(new Error('horrible error')); const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; - const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, + const reportPayload = { + browserTimezone: 'UTC', + headers: 'rp_headers_3', + objectType: 'testOt', }; await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).rejects.toMatchInlineSnapshot(`[Error: error]`); + store.addReport(reportType, 'user1', reportPayload) + ).rejects.toMatchInlineSnapshot(`[Error: horrible error]`); }); /* Creating the index will fail, if there were multiple jobs staged in @@ -116,20 +115,18 @@ describe('ReportingStore', () => { it('ignores index creation error if the index already exists and continues adding the report', async () => { // setup callClusterStub.withArgs('indices.exists').resolves(false); - callClusterStub.withArgs('indices.create').rejects(new Error('error')); + callClusterStub.withArgs('indices.create').rejects(new Error('devastating error')); const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; - const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, + const reportPayload = { + browserTimezone: 'UTC', + headers: 'rp_headers_4', + objectType: 'testOt', }; await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).rejects.toMatchInlineSnapshot(`[Error: error]`); + store.addReport(reportType, 'user1', reportPayload) + ).rejects.toMatchInlineSnapshot(`[Error: devastating error]`); }); it('skips creating the index if already exists', async () => { @@ -141,26 +138,223 @@ describe('ReportingStore', () => { const store = new ReportingStore(mockCore, mockLogger); const reportType = 'unknowntype'; - const reportPayload = {}; - const reportOptions = { - timeout: 10000, - created_by: 'created_by_string', - browser_type: 'browser_type_string', - max_attempts: 1, + const reportPayload = { + browserTimezone: 'UTC', + headers: 'rp_headers_5', + objectType: 'testOt', }; - await expect( - store.addReport(reportType, reportPayload, reportOptions) - ).resolves.toMatchObject({ + await expect(store.addReport(reportType, 'user1', reportPayload)).resolves.toMatchObject({ + _primary_term: undefined, + _seq_no: undefined, + attempts: 0, + browser_type: undefined, + completed_at: undefined, + created_by: 'user1', + jobtype: 'unknowntype', + max_attempts: undefined, + payload: {}, + priority: 10, + started_at: undefined, + status: 'pending', + timeout: undefined, + }); + }); + + it('allows username string to be `null`', async () => { + // setup + callClusterStub.withArgs('indices.exists').resolves(false); + callClusterStub + .withArgs('indices.create') + .rejects(new Error('resource_already_exists_exception')); // will be triggered but ignored + + const store = new ReportingStore(mockCore, mockLogger); + const reportType = 'unknowntype'; + const reportPayload = { + browserTimezone: 'UTC', + headers: 'rp_test_headers', + objectType: 'testOt', + }; + await expect(store.addReport(reportType, null, reportPayload)).resolves.toMatchObject({ _primary_term: undefined, _seq_no: undefined, - browser_type: 'browser_type_string', - created_by: 'created_by_string', + attempts: 0, + browser_type: undefined, + completed_at: undefined, + created_by: null, jobtype: 'unknowntype', - max_attempts: 1, + max_attempts: undefined, payload: {}, priority: 10, - timeout: 10000, + started_at: undefined, + status: 'pending', + timeout: undefined, }); }); }); + + it('setReportClaimed sets the status of a record to processing', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'id-of-processing', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { + headers: 'rp_test_headers', + objectType: 'testOt', + }, + timeout: 30000, + priority: 1, + }); + + await store.setReportClaimed(report, { testDoc: 'test' } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "status": "processing", + "testDoc": "test", + }, + }, + "id": "id-of-processing", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); + + it('setReportFailed sets the status of a record to failed', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'id-of-failure', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { + headers: 'rp_test_headers', + objectType: 'testOt', + }, + timeout: 30000, + priority: 1, + }); + + await store.setReportFailed(report, { errors: 'yes' } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "errors": "yes", + "status": "failed", + }, + }, + "id": "id-of-failure", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); + + it('setReportCompleted sets the status of a record to completed', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'vastly-great-report-id', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { + headers: 'rp_test_headers', + objectType: 'testOt', + }, + timeout: 30000, + priority: 1, + }); + + await store.setReportCompleted(report, { certainly_completed: 'yes' } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "certainly_completed": "yes", + "status": "completed", + }, + }, + "id": "vastly-great-report-id", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); + + it('setReportCompleted sets the status of a record to completed_with_warnings', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const report = new Report({ + _id: 'vastly-great-report-id', + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { + headers: 'rp_test_headers', + objectType: 'testOt', + }, + timeout: 30000, + priority: 1, + }); + + await store.setReportCompleted(report, { + certainly_completed: 'pretty_much', + output: { + warnings: [`those pants don't go with that shirt`], + }, + } as any); + + const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update'); + expect(updateCall && updateCall.args).toMatchInlineSnapshot(` + Array [ + "update", + Object { + "body": Object { + "doc": Object { + "certainly_completed": "pretty_much", + "output": Object { + "warnings": Array [ + "those pants don't go with that shirt", + ], + }, + "status": "completed_with_warnings", + }, + }, + "id": "vastly-great-report-id", + "if_primary_term": undefined, + "if_seq_no": undefined, + "index": ".reporting-test-index-12345", + }, + ] + `); + }); }); diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts index 0f1ed83b717671..12cff0e973ed62 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -5,36 +5,24 @@ */ import { ElasticsearchServiceSetup } from 'src/core/server'; -import { LevelLogger } from '../'; +import { LevelLogger, statuses } from '../'; import { ReportingCore } from '../../'; +import { CreateJobBaseParams, CreateJobBaseParamsEncryptedFields } from '../../types'; import { indexTimestamp } from './index_timestamp'; -import { LayoutInstance } from '../layouts'; import { mapping } from './mapping'; import { Report } from './report'; - -export const statuses = { - JOB_STATUS_PENDING: 'pending', - JOB_STATUS_PROCESSING: 'processing', - JOB_STATUS_COMPLETED: 'completed', - JOB_STATUS_WARNINGS: 'completed_with_warnings', - JOB_STATUS_FAILED: 'failed', - JOB_STATUS_CANCELLED: 'cancelled', -}; - -interface AddReportOpts { +interface JobSettings { timeout: number; - created_by: string | boolean; browser_type: string; max_attempts: number; + priority: number; } -interface UpdateQuery { - index: string; - id: string; - if_seq_no: unknown; - if_primary_term: unknown; - body: { doc: Partial }; -} +const checkReportIsEditable = (report: Report) => { + if (!report._id || !report._index) { + throw new Error(`Report object is not synced with ES!`); + } +}; /* * A class to give an interface to historical reports in the reporting.index @@ -43,9 +31,9 @@ interface UpdateQuery { * - interface for downloading the report */ export class ReportingStore { - public readonly indexPrefix: string; - public readonly indexInterval: string; - + private readonly indexPrefix: string; + private readonly indexInterval: string; + private readonly jobSettings: JobSettings; private client: ElasticsearchServiceSetup['legacy']['client']; private logger: LevelLogger; @@ -56,12 +44,18 @@ export class ReportingStore { this.client = elasticsearch.legacy.client; this.indexPrefix = config.get('index'); this.indexInterval = config.get('queue', 'indexInterval'); + this.jobSettings = { + timeout: config.get('queue', 'timeout'), + browser_type: config.get('capture', 'browser', 'type'), + max_attempts: config.get('capture', 'maxAttempts'), + priority: 10, // unused + }; this.logger = logger; } private async createIndex(indexName: string) { - return this.client + return await this.client .callAsInternalUser('indices.exists', { index: indexName, }) @@ -95,75 +89,157 @@ export class ReportingStore { return; } + this.logger.error(err); throw err; }); }); } - private async saveReport(report: Report) { - const payload = report.payload as { objectType: string; layout: LayoutInstance }; + /* + * Called from addReport, which handles any errors + */ + private async indexReport(report: Report) { + const params = report.payload; + + // Queing is handled by TM. These queueing-based fields for reference in Report Info panel + const infoFields = { + timeout: report.timeout, + process_expiration: new Date(0), // use epoch so the job query works + created_at: new Date(), + attempts: 0, + max_attempts: report.max_attempts, + status: statuses.JOB_STATUS_PENDING, + browser_type: report.browser_type, + }; const indexParams = { index: report._index, - id: report.id, + id: report._id, body: { + ...infoFields, jobtype: report.jobtype, meta: { // We are copying these values out of payload because these fields are indexed and can be aggregated on // for tracking stats, while payload contents are not. - objectType: payload.objectType, - layout: payload.layout ? payload.layout.id : 'none', + objectType: params.objectType, + layout: params.layout ? params.layout.id : 'none', }, payload: report.payload, created_by: report.created_by, - timeout: report.timeout, - process_expiration: new Date(0), // use epoch so the job query works - created_at: new Date(), - attempts: 0, - max_attempts: report.max_attempts, - status: statuses.JOB_STATUS_PENDING, - browser_type: report.browser_type, }, }; - return this.client.callAsInternalUser('index', indexParams); + return await this.client.callAsInternalUser('index', indexParams); } + /* + * Called from addReport, which handles any errors + */ private async refreshIndex(index: string) { - return this.client.callAsInternalUser('indices.refresh', { index }); + return await this.client.callAsInternalUser('indices.refresh', { index }); } - public async addReport(type: string, payload: unknown, options: AddReportOpts): Promise { + public async addReport( + type: string, + username: string | null, + payload: CreateJobBaseParams & CreateJobBaseParamsEncryptedFields + ): Promise { const timestamp = indexTimestamp(this.indexInterval); const index = `${this.indexPrefix}-${timestamp}`; await this.createIndex(index); const report = new Report({ - index, + _index: index, payload, jobtype: type, - created_by: options.created_by, - browser_type: options.browser_type, - max_attempts: options.max_attempts, - timeout: options.timeout, - priority: 10, // unused + created_by: username, + ...this.jobSettings, }); - const doc = await this.saveReport(report); - report.updateWithDoc(doc); + try { + const doc = await this.indexReport(report); + report.updateWithEsDoc(doc); - await this.refreshIndex(index); - this.logger.info(`Successfully queued pending job: ${report._index}/${report.id}`); + await this.refreshIndex(index); + this.logger.debug(`Successfully stored pending job: ${report._index}/${report._id}`); - return report; + return report; + } catch (err) { + this.logger.error(`Error in addReport!`); + this.logger.error(err); + throw err; + } } - public async updateReport(query: UpdateQuery): Promise { - return this.client.callAsInternalUser('update', { - index: query.index, - id: query.id, - if_seq_no: query.if_seq_no, - if_primary_term: query.if_primary_term, - body: { doc: query.body.doc }, - }); + public async setReportClaimed(report: Report, stats: Partial): Promise { + const doc = { + ...stats, + status: statuses.JOB_STATUS_PROCESSING, + }; + + try { + checkReportIsEditable(report); + + return await this.client.callAsInternalUser('update', { + id: report._id, + index: report._index, + if_seq_no: report._seq_no, + if_primary_term: report._primary_term, + body: { doc }, + }); + } catch (err) { + this.logger.error('Error in setting report processing status!'); + this.logger.error(err); + throw err; + } + } + + public async setReportFailed(report: Report, stats: Partial): Promise { + const doc = { + ...stats, + status: statuses.JOB_STATUS_FAILED, + }; + + try { + checkReportIsEditable(report); + + return await this.client.callAsInternalUser('update', { + id: report._id, + index: report._index, + if_seq_no: report._seq_no, + if_primary_term: report._primary_term, + body: { doc }, + }); + } catch (err) { + this.logger.error('Error in setting report failed status!'); + this.logger.error(err); + throw err; + } + } + + public async setReportCompleted(report: Report, stats: Partial): Promise { + try { + const { output } = stats as { output: any }; + const status = + output && output.warnings && output.warnings.length > 0 + ? statuses.JOB_STATUS_WARNINGS + : statuses.JOB_STATUS_COMPLETED; + const doc = { + ...stats, + status, + }; + checkReportIsEditable(report); + + return await this.client.callAsInternalUser('update', { + id: report._id, + index: report._index, + if_seq_no: report._seq_no, + if_primary_term: report._primary_term, + body: { doc }, + }); + } catch (err) { + this.logger.error('Error in setting report complete status!'); + this.logger.error(err); + throw err; + } } } diff --git a/x-pack/plugins/reporting/server/plugin.ts b/x-pack/plugins/reporting/server/plugin.ts index cedc9dc14a2376..20e22c2db00e35 100644 --- a/x-pack/plugins/reporting/server/plugin.ts +++ b/x-pack/plugins/reporting/server/plugin.ts @@ -8,13 +8,7 @@ import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from 'src/core import { ReportingCore } from './'; import { initializeBrowserDriverFactory } from './browsers'; import { buildConfig, ReportingConfigType } from './config'; -import { - createQueueFactory, - enqueueJobFactory, - LevelLogger, - runValidations, - ReportingStore, -} from './lib'; +import { createQueueFactory, LevelLogger, runValidations, ReportingStore } from './lib'; import { registerRoutes } from './routes'; import { setFieldFormats } from './services'; import { ReportingSetup, ReportingSetupDeps, ReportingStart, ReportingStartDeps } from './types'; @@ -94,14 +88,12 @@ export class ReportingPlugin const browserDriverFactory = await initializeBrowserDriverFactory(config, logger); const store = new ReportingStore(reportingCore, logger); const esqueue = await createQueueFactory(reportingCore, store, logger); // starts polling for pending jobs - const enqueueJob = enqueueJobFactory(reportingCore, store, logger); // called from generation routes reportingCore.pluginStart({ browserDriverFactory, savedObjects: core.savedObjects, uiSettings: core.uiSettings, esqueue, - enqueueJob, store, }); diff --git a/x-pack/plugins/reporting/server/routes/generate_from_jobparams.ts b/x-pack/plugins/reporting/server/routes/generate_from_jobparams.ts index 2a12a64d67a354..f4959b56dfea1e 100644 --- a/x-pack/plugins/reporting/server/routes/generate_from_jobparams.ts +++ b/x-pack/plugins/reporting/server/routes/generate_from_jobparams.ts @@ -10,6 +10,7 @@ import { authorizedUserPreRoutingFactory } from './lib/authorized_user_pre_routi import { HandlerErrorFunction, HandlerFunction } from './types'; import { ReportingCore } from '../'; import { API_BASE_URL } from '../../common/constants'; +import { CreateJobBaseParams } from '../types'; const BASE_GENERATE = `${API_BASE_URL}/generate`; @@ -44,13 +45,13 @@ export function registerGenerateFromJobParams( }, }, userHandler(async (user, context, req, res) => { - let jobParamsRison: string | null; + let jobParamsRison: null | string = null; if (req.body) { - const { jobParams: jobParamsPayload } = req.body as { jobParams: string }; - jobParamsRison = jobParamsPayload; - } else { - const { jobParams: queryJobParams } = req.query as { jobParams: string }; + const { jobParams: jobParamsPayload } = req.body; + jobParamsRison = jobParamsPayload ? jobParamsPayload : null; + } else if (req.query?.jobParams) { + const { jobParams: queryJobParams } = req.query; if (queryJobParams) { jobParamsRison = queryJobParams; } else { @@ -65,11 +66,11 @@ export function registerGenerateFromJobParams( }); } - const { exportType } = req.params as { exportType: string }; + const { exportType } = req.params; let jobParams; try { - jobParams = rison.decode(jobParamsRison) as object | null; + jobParams = rison.decode(jobParamsRison) as CreateJobBaseParams | null; if (!jobParams) { return res.customError({ statusCode: 400, diff --git a/x-pack/plugins/reporting/server/routes/generate_from_savedobject_immediate.ts b/x-pack/plugins/reporting/server/routes/generate_from_savedobject_immediate.ts index 8250ca462049b7..a0a8f25de7fc49 100644 --- a/x-pack/plugins/reporting/server/routes/generate_from_savedobject_immediate.ts +++ b/x-pack/plugins/reporting/server/routes/generate_from_savedobject_immediate.ts @@ -5,16 +5,24 @@ */ import { schema } from '@kbn/config-schema'; +import { KibanaRequest } from 'src/core/server'; import { ReportingCore } from '../'; import { API_BASE_GENERATE_V1 } from '../../common/constants'; import { scheduleTaskFnFactory } from '../export_types/csv_from_savedobject/create_job'; import { runTaskFnFactory } from '../export_types/csv_from_savedobject/execute_job'; +import { JobParamsPostPayloadPanelCsv } from '../export_types/csv_from_savedobject/types'; import { LevelLogger as Logger } from '../lib'; import { TaskRunResult } from '../types'; import { authorizedUserPreRoutingFactory } from './lib/authorized_user_pre_routing'; import { getJobParamsFromRequest } from './lib/get_job_params_from_request'; import { HandlerErrorFunction } from './types'; +export type CsvFromSavedObjectRequest = KibanaRequest< + { savedObjectType: string; savedObjectId: string }, + unknown, + JobParamsPostPayloadPanelCsv +>; + /* * This function registers API Endpoints for immediate Reporting jobs. The API inputs are: * - saved object type and ID @@ -56,7 +64,7 @@ export function registerGenerateCsvFromSavedObjectImmediate( }), }, }, - userHandler(async (user, context, req, res) => { + userHandler(async (user, context, req: CsvFromSavedObjectRequest, res) => { const logger = parentLogger.clone(['savedobject-csv']); const jobParams = getJobParamsFromRequest(req, { isImmediate: true }); const scheduleTaskFn = scheduleTaskFnFactory(reporting, logger); diff --git a/x-pack/plugins/reporting/server/routes/generation.test.ts b/x-pack/plugins/reporting/server/routes/generation.test.ts index 87a696948ad84d..cef4da9aabbd47 100644 --- a/x-pack/plugins/reporting/server/routes/generation.test.ts +++ b/x-pack/plugins/reporting/server/routes/generation.test.ts @@ -138,8 +138,7 @@ describe('POST /api/reporting/generate', () => { }); it('returns 500 if job handler throws an error', async () => { - // throw an error from enqueueJob - core.getEnqueueJob = jest.fn().mockRejectedValue('Sorry, this tests says no'); + callClusterStub.withArgs('index').rejects('silly'); registerJobGenerationRoutes(core, mockLogger); diff --git a/x-pack/plugins/reporting/server/routes/generation.ts b/x-pack/plugins/reporting/server/routes/generation.ts index 017e875931ae2c..b2115076aada4c 100644 --- a/x-pack/plugins/reporting/server/routes/generation.ts +++ b/x-pack/plugins/reporting/server/routes/generation.ts @@ -10,6 +10,7 @@ import { kibanaResponseFactory } from 'src/core/server'; import { ReportingCore } from '../'; import { API_BASE_URL } from '../../common/constants'; import { LevelLogger as Logger } from '../lib'; +import { enqueueJobFactory } from '../lib/enqueue_job'; import { registerGenerateFromJobParams } from './generate_from_jobparams'; import { registerGenerateCsvFromSavedObjectImmediate } from './generate_from_savedobject_immediate'; import { HandlerFunction } from './types'; @@ -43,11 +44,10 @@ export function registerJobGenerationRoutes(reporting: ReportingCore, logger: Lo } try { - const enqueueJob = await reporting.getEnqueueJob(); - const job = await enqueueJob(exportTypeId, jobParams, user, context, req); + const enqueueJob = enqueueJobFactory(reporting, logger); + const report = await enqueueJob(exportTypeId, jobParams, user, context, req); // return the queue's job information - const jobJson = job.toJSON(); const downloadBaseUrl = getDownloadBaseUrl(reporting); return res.ok({ @@ -55,8 +55,8 @@ export function registerJobGenerationRoutes(reporting: ReportingCore, logger: Lo 'content-type': 'application/json', }, body: { - path: `${downloadBaseUrl}/${jobJson.id}`, - job: jobJson, + path: `${downloadBaseUrl}/${report._id}`, + job: report.toApiJSON(), }, }); } catch (err) { diff --git a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts index d384cbb878a0ed..84a98d6d1f1d7a 100644 --- a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts +++ b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts @@ -8,8 +8,7 @@ import contentDisposition from 'content-disposition'; import { get } from 'lodash'; import { CSV_JOB_TYPE } from '../../../common/constants'; -import { statuses } from '../../lib/esqueue/constants/statuses'; -import { ExportTypesRegistry } from '../../lib/export_types_registry'; +import { ExportTypesRegistry, statuses } from '../../lib'; import { ExportTypeDefinition, JobSource, TaskRunResult } from '../../types'; type ExportTypeType = ExportTypeDefinition; @@ -18,11 +17,11 @@ interface ErrorFromPayload { message: string; } -// A camelCase version of TaskRunResult +// interface of the API result interface Payload { statusCode: number; content: string | Buffer | ErrorFromPayload; - contentType: string; + contentType: string | null; headers: Record; } diff --git a/x-pack/plugins/reporting/server/routes/lib/get_job_params_from_request.ts b/x-pack/plugins/reporting/server/routes/lib/get_job_params_from_request.ts index e5c1f382413497..bfa15a4022a4d6 100644 --- a/x-pack/plugins/reporting/server/routes/lib/get_job_params_from_request.ts +++ b/x-pack/plugins/reporting/server/routes/lib/get_job_params_from_request.ts @@ -4,21 +4,15 @@ * you may not use this file except in compliance with the Elastic License. */ -import { KibanaRequest } from 'src/core/server'; -import { - JobParamsPanelCsv, - JobParamsPostPayloadPanelCsv, -} from '../../export_types/csv_from_savedobject/types'; +import { JobParamsPanelCsv } from '../../export_types/csv_from_savedobject/types'; +import { CsvFromSavedObjectRequest } from '../generate_from_savedobject_immediate'; export function getJobParamsFromRequest( - request: KibanaRequest, + request: CsvFromSavedObjectRequest, opts: { isImmediate: boolean } ): JobParamsPanelCsv { - const { savedObjectType, savedObjectId } = request.params as { - savedObjectType: string; - savedObjectId: string; - }; - const { timerange, state } = request.body as JobParamsPostPayloadPanelCsv; + const { savedObjectType, savedObjectId } = request.params; + const { timerange, state } = request.body; const post = timerange || state ? { timerange, state } : undefined; diff --git a/x-pack/plugins/reporting/server/routes/types.d.ts b/x-pack/plugins/reporting/server/routes/types.d.ts index 607ce34ab94652..c92c9fb7eef749 100644 --- a/x-pack/plugins/reporting/server/routes/types.d.ts +++ b/x-pack/plugins/reporting/server/routes/types.d.ts @@ -6,12 +6,12 @@ import { KibanaRequest, KibanaResponseFactory, RequestHandlerContext } from 'src/core/server'; import { AuthenticatedUser } from '../../../security/server'; -import { ScheduledTaskParams } from '../types'; +import { CreateJobBaseParams, ScheduledTaskParams } from '../types'; export type HandlerFunction = ( user: AuthenticatedUser | null, exportType: string, - jobParams: object, + jobParams: CreateJobBaseParams, context: RequestHandlerContext, req: KibanaRequest, res: KibanaResponseFactory diff --git a/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts b/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts index 95b06aa39f07e4..c508ee6974ca00 100644 --- a/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts +++ b/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts @@ -8,7 +8,6 @@ jest.mock('../routes'); jest.mock('../usage'); jest.mock('../browsers'); jest.mock('../lib/create_queue'); -jest.mock('../lib/enqueue_job'); jest.mock('../lib/validate'); import * as Rx from 'rxjs'; @@ -19,10 +18,9 @@ import { initializeBrowserDriverFactory, } from '../browsers'; import { ReportingInternalSetup, ReportingInternalStart } from '../core'; -import { ReportingStartDeps } from '../types'; import { ReportingStore } from '../lib'; +import { ReportingStartDeps } from '../types'; import { createMockLevelLogger } from './create_mock_levellogger'; -import { Report } from '../lib/store'; (initializeBrowserDriverFactory as jest.Mock< Promise @@ -30,10 +28,13 @@ import { Report } from '../lib/store'; (chromium as any).createDriverFactory.mockImplementation(() => ({})); -const createMockPluginSetup = (setupMock?: any): ReportingInternalSetup => { +const createMockPluginSetup = ( + mockReportingCore: ReportingCore, + setupMock?: any +): ReportingInternalSetup => { return { elasticsearch: setupMock.elasticsearch || { legacy: { client: {} } }, - basePath: setupMock.basePath, + basePath: setupMock.basePath || '/all-about-that-basepath', router: setupMock.router, security: setupMock.security, licensing: { license$: Rx.of({ isAvailable: true, isActive: true, type: 'basic' }) } as any, @@ -48,7 +49,6 @@ const createMockPluginStart = ( const store = new ReportingStore(mockReportingCore, logger); return { browserDriverFactory: startMock.browserDriverFactory, - enqueueJob: startMock.enqueueJob || jest.fn().mockResolvedValue(new Report({} as any)), esqueue: startMock.esqueue, savedObjects: startMock.savedObjects || { getScopedClient: jest.fn() }, uiSettings: startMock.uiSettings || { asScopedToClient: () => ({ get: jest.fn() }) }, @@ -72,15 +72,14 @@ export const createMockReportingCore = async ( setupDepsMock: ReportingInternalSetup | undefined = undefined, startDepsMock: ReportingInternalStart | undefined = undefined ) => { - if (!setupDepsMock) { - setupDepsMock = createMockPluginSetup({}); - } - const mockReportingCore = { getConfig: () => config, getElasticsearchService: () => setupDepsMock?.elasticsearch, } as ReportingCore; + if (!setupDepsMock) { + setupDepsMock = createMockPluginSetup(mockReportingCore, {}); + } if (!startDepsMock) { startDepsMock = createMockPluginStart(mockReportingCore, {}); } diff --git a/x-pack/plugins/reporting/server/types.ts b/x-pack/plugins/reporting/server/types.ts index ff597b53ea0b0b..c9649cb6e558b2 100644 --- a/x-pack/plugins/reporting/server/types.ts +++ b/x-pack/plugins/reporting/server/types.ts @@ -19,24 +19,12 @@ import { LevelLogger } from './lib'; import { LayoutInstance } from './lib/layouts'; /* - * Routing / API types + * Routing types */ -interface ListQuery { - page: string; - size: string; - ids?: string; // optional field forbids us from extending RequestQuery -} - -interface GenerateQuery { - jobParams: string; -} - -export type ReportingRequestQuery = ListQuery | GenerateQuery; - export interface ReportingRequestPre { management: { - jobTypes: any; + jobTypes: string[]; }; user: string; } @@ -54,12 +42,14 @@ export interface TimeRangeParams { max?: Date | string | number | null; } +// the "raw" data coming from the client, unencrypted export interface JobParamPostPayload { timerange?: TimeRangeParams; } +// the pre-processed, encrypted data ready for storage export interface ScheduledTaskParams { - headers?: string; // serialized encrypted headers + headers: string; // serialized encrypted headers jobParams: JobParamsType; title: string; type: string; @@ -77,10 +67,10 @@ export interface JobSource { } export interface TaskRunResult { - content_type: string; + content_type: string | null; content: string | null; - size: number; csv_contains_formulas?: boolean; + size: number; max_size_reached?: boolean; warnings?: string[]; } @@ -177,17 +167,29 @@ export type ReportingSetup = object; export type CaptureConfig = ReportingConfigType['capture']; export type ScrollConfig = ReportingConfigType['csv']['scroll']; -export type ESQueueCreateJobFn = ( +export interface CreateJobBaseParams { + browserTimezone: string; + layout?: LayoutInstance; // for screenshot type reports + objectType: string; +} + +export interface CreateJobBaseParamsEncryptedFields extends CreateJobBaseParams { + basePath?: string; // for screenshot type reports + headers: string; // encrypted headers +} + +export type CreateJobFn = ( jobParams: JobParamsType, context: RequestHandlerContext, request: KibanaRequest -) => Promise; +) => Promise; -export type ESQueueWorkerExecuteFn = ( +// rename me +export type WorkerExecuteFn = ( jobId: string, job: ScheduledTaskParamsType, cancellationToken: CancellationToken -) => Promise; +) => Promise; export type ScheduleTaskFnFactory = ( reporting: ReportingCore,