Skip to content

Commit

Permalink
TEMP COMMIT. REBASE ME
Browse files Browse the repository at this point in the history
  • Loading branch information
banderror committed May 4, 2021
1 parent f450b94 commit 227f73f
Show file tree
Hide file tree
Showing 15 changed files with 635 additions and 585 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ interface BufferItem {
interface ConstructorParams {
indexName: string;
elasticsearch: Promise<ElasticsearchClient>;
isWriteEnabled: boolean;
logger: Logger;
}

Expand All @@ -28,12 +29,14 @@ export type IIndexWriter = PublicMethodsOf<IndexWriter>;
export class IndexWriter {
private readonly indexName: string;
private readonly elasticsearch: Promise<ElasticsearchClient>;
private readonly isWriteEnabled: boolean;
private readonly logger: Logger;
private readonly buffer: BufferedStream<BufferItem>;

constructor(params: ConstructorParams) {
this.indexName = params.indexName;
this.elasticsearch = params.elasticsearch;
this.isWriteEnabled = params.isWriteEnabled;
this.logger = params.logger.get('IndexWriter');

this.buffer = new BufferedStream<BufferItem>({
Expand All @@ -42,13 +45,17 @@ export class IndexWriter {
}

public indexOne(doc: Document): void {
this.buffer.enqueue({ index: this.indexName, doc });
if (this.isWriteEnabled) {
this.buffer.enqueue({ index: this.indexName, doc });
}
}

public indexMany(docs: Document[]): void {
docs.forEach((doc) => {
this.buffer.enqueue({ index: this.indexName, doc });
});
if (this.isWriteEnabled) {
docs.forEach((doc) => {
this.buffer.enqueue({ index: this.indexName, doc });
});
}
}

public async close(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ export interface IndexParams {

/** @example 'mylog' */
logName: string;

/** @example 'default' */
spaceId: string;
}

export interface IndexNames extends IndexParams {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,11 @@ export class EventLogBootstrapper {
this.bootstrappingFinished.signal({ success });
}

public getProcess(): EventLogBootstrappingProcess {
return {
waitUntilFinished: () => this.bootstrappingFinished.wait(),
};
public waitUntilFinished(): Promise<EventLogBootstrappingResult> {
return this.bootstrappingFinished.wait();
}
}

export interface EventLogBootstrappingProcess {
waitUntilFinished: () => Promise<EventLogBootstrappingResult>;
}

export interface EventLogBootstrappingResult {
success: boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ILMPolicy, defaultIlmPolicy } from '../elasticsearch';
import { EventSchema, FieldMap, Schema } from '../event_schema';
import { EventLogOptions, IEventLogDefinition } from './public_api';

export class EventLogDefinition<TMap extends FieldMap> implements IEventLogDefinition<TMap> {
public readonly eventLogName: string;
public readonly eventSchema: EventSchema<TMap>;
public readonly ilmPolicy: ILMPolicy;

constructor(options: EventLogOptions<TMap>) {
this.eventLogName = options.name;
this.eventSchema = options.schema;
this.ilmPolicy = options.ilmPolicy ?? defaultIlmPolicy;
}

public defineChild<TExtMap extends FieldMap = TMap>(
options: EventLogOptions<TExtMap>
): IEventLogDefinition<TMap & TExtMap> {
// TODO: validate name (can't contain "-" and ".")
const childName = `${this.eventLogName}.${options.name}`;
const childSchema = Schema.combine(this.eventSchema, options.schema);
const childPolicy = options.ilmPolicy ?? this.ilmPolicy;

return new EventLogDefinition({
name: childName,
schema: childSchema,
ilmPolicy: childPolicy,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,40 @@
*/

import { Logger } from 'kibana/server';
import { IndexSpecification, IndexNames, IIndexWriter, IIndexReader } from '../elasticsearch';
import { Schema, EventSchema, FieldMap, Event } from '../event_schema';
import { IEventLogRegistry } from './internal_api';
import { EventLogOptions, IEventLog, IEventLogProvider } from './public_api';
import { EventLogBootstrappingProcess } from './event_log_bootstrapper';
import { IndexSpecification, IIndexWriter, IIndexReader } from '../elasticsearch';
import { EventSchema, FieldMap, Event } from '../event_schema';
import { IEventLog, IEventLogProvider } from './public_api';
import { EventLogBootstrapper } from './event_log_bootstrapper';
import { EventLog } from './event_log';

interface ConstructorParams<TMap extends FieldMap> {
eventSchema: EventSchema<TMap>;
indexSpec: IndexSpecification;
indexReader: IIndexReader;
indexWriter: IIndexWriter;
isWriteEnabled: boolean;
registry: IEventLogRegistry;
logBootstrapper: EventLogBootstrapper;
logger: Logger;
bootstrapping: EventLogBootstrappingProcess;
}

export class EventLogProvider<TMap extends FieldMap> implements IEventLogProvider<TMap> {
private readonly eventSchema: EventSchema<TMap>;
private readonly indexSpec: IndexSpecification;
private readonly indexReader: IIndexReader;
private readonly indexWriter: IIndexWriter;
private readonly isWriteEnabled: boolean;
private readonly registry: IEventLogRegistry;
private readonly logger: Logger; // TODO: use or remove
private readonly logBootstrapping: EventLogBootstrappingProcess;
private readonly logBootstrapper: EventLogBootstrapper;
private log: EventLog<Event<TMap>> | null;
private isIndexBootstrapped: boolean;

constructor(params: ConstructorParams<TMap>) {
this.eventSchema = params.eventSchema;
this.indexSpec = params.indexSpec;
this.indexReader = params.indexReader;
this.indexWriter = params.indexWriter;
this.isWriteEnabled = params.isWriteEnabled;
this.registry = params.registry;
this.logger = params.logger.get('EventLogProvider');
this.logBootstrapping = params.bootstrapping;
this.logBootstrapper = params.logBootstrapper;
this.log = null;
this.isIndexBootstrapped = false;
}

public getEventSchema(): EventSchema<TMap> {
Expand All @@ -59,50 +54,30 @@ export class EventLogProvider<TMap extends FieldMap> implements IEventLogProvide
return this.indexSpec.indexNames.logName;
}

public async getLog(): Promise<IEventLog<Event<TMap>>> {
if (this.log) {
return this.log;
}

public async getLog(bootstrapIndex: boolean = true): Promise<IEventLog<Event<TMap>>> {
const { indexNames } = this.indexSpec;
const { logName } = indexNames;
const { success } = await this.logBootstrapping.waitUntilFinished();

if (!success) {
// TODO: or rather log (to console) and return an "empty" EventLog (null object)?
throw new Error(`Event log bootstrapping failed, logName="${logName}"`);
if (bootstrapIndex && !this.isIndexBootstrapped) {
this.logBootstrapper.start();
const { success } = await this.logBootstrapper.waitUntilFinished();
this.isIndexBootstrapped = success;

if (!success) {
// TODO: or rather log (to console) and return an "empty" EventLog (null object)?
throw new Error(`Event log bootstrapping failed, logName="${logName}"`);
}
}

if (!this.log) {
this.log = new EventLog<Event<TMap>>({
indexNames,
indexReader: this.indexReader,
indexWriter: this.indexWriter,
isWriteEnabled: this.isWriteEnabled,
logger: this.logger,
});
}

return this.log;
}

public async closeLog(): Promise<void> {
await this.indexWriter.close();
}

public registerLog<TExtMap extends FieldMap>(
options: EventLogOptions<TExtMap>
): IEventLogProvider<TMap & TExtMap> {
const { indexNames, ilmPolicy } = this.indexSpec;

const childLogName = IndexNames.createChildLogName(indexNames, options.name);
const childSchema = Schema.combine(this.eventSchema, options.schema);
const childIlmPolicy = options.ilmPolicy ?? ilmPolicy;

return this.registry.register({
name: childLogName,
schema: childSchema,
ilmPolicy: childIlmPolicy,
});
}
}
Loading

0 comments on commit 227f73f

Please sign in to comment.