Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reporting] remove unused "immediate export" abstractions from esqueue worker cycles #65375

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 13 additions & 30 deletions x-pack/legacy/plugins/reporting/server/lib/create_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@ import {
ESQueueInstance,
ESQueueWorkerExecuteFn,
ExportTypeDefinition,
ImmediateExecuteFn,
JobDocPayload,
JobSource,
Logger,
RequestFacade,
} from '../../types';
// @ts-ignore untyped dependency
import { events as esqueueEvents } from './esqueue';

export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, logger: Logger) {
type JobDocPayloadType = JobDocPayload<JobParamsType>;

const config = reporting.getConfig();
const queueConfig = config.get('queue');
const kibanaName = config.kbnConfig.get('server', 'name');
Expand All @@ -31,48 +26,36 @@ export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, log
// Once more document types are added, this will need to be passed in
return async function createWorker(queue: ESQueueInstance) {
// export type / execute job map
const jobExecutors: Map<
string,
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
> = new Map();
const jobExecutors: Map<string, ESQueueWorkerExecuteFn<unknown>> = new Map();

for (const exportType of reporting.getExportTypesRegistry().getAll() as Array<
ExportTypeDefinition<
JobParamsType,
unknown,
unknown,
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
>
ExportTypeDefinition<JobParamsType, unknown, unknown, ESQueueWorkerExecuteFn<unknown>>
>) {
const jobExecutor = await exportType.executeJobFactory(reporting, logger); // FIXME: does not "need" to be async
jobExecutors.set(exportType.jobType, jobExecutor);
}

const workerFn = (jobSource: JobSource<JobParamsType>, ...workerRestArgs: any[]) => {
const workerFn = <ScheduledTaskParamsType>(
jobSource: JobSource<ScheduledTaskParamsType>,
jobParams: ScheduledTaskParamsType,
cancellationToken: CancellationToken
) => {
const {
_id: jobId,
_source: { jobtype: jobType },
} = jobSource;

if (!jobId) {
throw new Error(`Claimed job is missing an ID!: ${JSON.stringify(jobSource)}`);
}

const jobTypeExecutor = jobExecutors.get(jobType);
// pass the work to the jobExecutor
if (!jobTypeExecutor) {
throw new Error(`Unable to find a job executor for the claimed job: [${jobId}]`);
}

if (jobId) {
const jobExecutorWorker = jobTypeExecutor as ESQueueWorkerExecuteFn<JobDocPayloadType>;
return jobExecutorWorker(
jobId,
...(workerRestArgs as [JobDocPayloadType, CancellationToken])
);
} else {
const jobExecutorImmediate = jobExecutors.get(jobType) as ImmediateExecuteFn<JobParamsType>;
return jobExecutorImmediate(
null,
...(workerRestArgs as [JobDocPayload<JobParamsType>, RequestFacade])
);
}
// pass the work to the jobExecutor
return jobTypeExecutor(jobId, jobParams, cancellationToken);
};

const workerOptions = {
Expand Down
3 changes: 1 addition & 2 deletions x-pack/legacy/plugins/reporting/server/lib/enqueue_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
ConditionalHeaders,
EnqueueJobFn,
ESQueueCreateJobFn,
ImmediateCreateJobFn,
Job,
Logger,
RequestFacade,
Expand Down Expand Up @@ -40,7 +39,7 @@ export function enqueueJobFactory(reporting: ReportingCore, parentLogger: Logger
headers: ConditionalHeaders['headers'],
request: RequestFacade
): Promise<Job> {
type CreateJobFn = ESQueueCreateJobFn<JobParamsType> | ImmediateCreateJobFn<JobParamsType>;
type CreateJobFn = ESQueueCreateJobFn<JobParamsType>;

const esqueue = await reporting.getEsqueue();
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);
Expand Down