diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts index d62b0710a6..8aa57c9f68 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/InstrumentDescriptor.ts @@ -16,6 +16,8 @@ import { MetricOptions, ValueType } from '@opentelemetry/api-metrics-wip'; import { InstrumentType } from './Instruments'; +import { View } from './view/View'; + export interface InstrumentDescriptor { readonly name: string; @@ -34,3 +36,13 @@ export function createInstrumentDescriptor(name: string, type: InstrumentType, o valueType: options?.valueType ?? ValueType.DOUBLE, }; } + +export function createInstrumentDescriptorWithView(view: View, instrument: InstrumentDescriptor): InstrumentDescriptor { + return { + name: view.name ?? instrument.name, + description: view.description ?? instrument.description, + type: instrument.type, + unit: instrument.unit, + valueType: instrument.valueType, + }; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts index 824af76ca3..0e058447dd 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts @@ -20,12 +20,17 @@ import { createInstrumentDescriptor, InstrumentDescriptor } from './InstrumentDe import { Counter, Histogram, InstrumentType, UpDownCounter } from './Instruments'; import { MeterProviderSharedState } from './state/MeterProviderSharedState'; import { MultiMetricStorage } from './state/MultiWritableMetricStorage'; -import { NoopWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage'; +import { SyncMetricStorage } from './state/SyncMetricStorage'; +import { MetricStorage } from './state/MetricStorage'; +import { MetricData } from './export/MetricData'; +import { isNotNullish } from './utils'; +import { MetricCollectorHandle } from './state/MetricCollector'; +import { HrTime } from '@opentelemetry/api'; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#meter export class Meter implements metrics.Meter { - private _metricStorageRegistry = new Map(); + private _metricStorageRegistry = new Map(); // instrumentation library required by spec to be on meter // spec requires provider config changes to apply to previously created meters, achieved by holding a reference to the provider @@ -80,9 +85,8 @@ export class Meter implements metrics.Meter { private _registerMetricStorage(descriptor: InstrumentDescriptor) { const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary); - const storages = views.map(_view => { - // TODO: create actual metric storages. - const storage = new NoopWritableMetricStorage(); + const storages = views.map(view => { + const storage = SyncMetricStorage.create(view, descriptor); // TODO: handle conflicts this._metricStorageRegistry.set(descriptor.name, storage); return storage; @@ -92,4 +96,17 @@ export class Meter implements metrics.Meter { } return new MultiMetricStorage(storages); } + + async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise { + const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => { + return metricStorage.collect( + collector, + this._meterProviderSharedState.metricCollectors, + this._meterProviderSharedState.resource, + this._instrumentationLibrary, + this._meterProviderSharedState.sdkStartTime, + collectionTime); + })); + return result.filter(isNotNullish); + } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts index 28b66806ba..62c8892cbe 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/MeterProvider.ts @@ -18,13 +18,12 @@ import * as api from '@opentelemetry/api'; import * as metrics from '@opentelemetry/api-metrics-wip'; import { Resource } from '@opentelemetry/resources'; import { Meter } from './Meter'; -import { MetricExporter } from './MetricExporter'; -import { MetricReader } from './MetricReader'; +import { MetricReader } from './export/MetricReader'; import { MeterProviderSharedState } from './state/MeterProviderSharedState'; import { InstrumentSelector } from './view/InstrumentSelector'; import { MeterSelector } from './view/MeterSelector'; import { View } from './view/View'; - +import { MetricCollector } from './state/MetricCollector'; // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#meterprovider @@ -35,8 +34,6 @@ export type MeterProviderOptions = { export class MeterProvider { private _sharedState: MeterProviderSharedState; private _shutdown = false; - private _metricReaders: MetricReader[] = []; - private _metricExporters: MetricExporter[] = []; constructor(options: MeterProviderOptions) { this._sharedState = new MeterProviderSharedState(options.resource ?? Resource.empty()); @@ -57,7 +54,9 @@ export class MeterProvider { } addMetricReader(metricReader: MetricReader) { - this._metricReaders.push(metricReader); + const collector = new MetricCollector(this._sharedState, metricReader); + metricReader.setMetricProducer(collector); + this._sharedState.metricCollectors.push(collector); } addView(view: View, instrumentSelector: InstrumentSelector, meterSelector: MeterSelector) { @@ -66,7 +65,8 @@ export class MeterProvider { } /** - * Flush all buffered data and shut down the MeterProvider and all exporters and metric readers. + * Flush all buffered data and shut down the MeterProvider and all registered + * MetricReaders. * Returns a promise which is resolved when all flushes are complete. * * TODO: return errors to caller somehow? @@ -82,12 +82,11 @@ export class MeterProvider { // TODO add a timeout - spec leaves it up the the SDK if this is configurable this._shutdown = true; - // Shut down all exporters and readers. - // Log all Errors. - for (const exporter of this._metricExporters) { + for (const collector of this._sharedState.metricCollectors) { try { - await exporter.shutdown(); + await collector.shutdown(); } catch (e) { + // Log all Errors. if (e instanceof Error) { api.diag.error(`Error shutting down: ${e.message}`) } @@ -96,7 +95,7 @@ export class MeterProvider { } /** - * Notifies all exporters and metric readers to flush any buffered data. + * Notifies all registered MetricReaders to flush any buffered data. * Returns a promise which is resolved when all flushes are complete. * * TODO: return errors to caller somehow? @@ -112,10 +111,11 @@ export class MeterProvider { return; } - for (const exporter of [...this._metricExporters, ...this._metricReaders]) { + for (const collector of this._sharedState.metricCollectors) { try { - await exporter.forceFlush(); + await collector.forceFlush(); } catch (e) { + // Log all Errors. if (e instanceof Error) { api.diag.error(`Error flushing: ${e.message}`) } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MetricExporter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MetricExporter.ts deleted file mode 100644 index 4046f04136..0000000000 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MetricExporter.ts +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricexporter - -// TODO should this just be an interface and exporters can implement their own shutdown? -export abstract class MetricExporter { - protected _shutdown = false; - - // TODO: define the methods that actually export - must allow for push and pull exporters - - async shutdown(): Promise { - if (this._shutdown) { - return; - } - - // Setting _shutdown before flushing might prevent some exporters from flushing - // Waiting until flushing is complete might allow another flush to occur during shutdown - const flushPromise = this.forceFlush(); - this._shutdown = true; - await flushPromise; - } - - abstract forceFlush(): Promise; - - isShutdown() { - return this._shutdown; - } -} - -export class ConsoleMetricExporter extends MetricExporter { - async export() { - throw new Error('Method not implemented'); - } - - // nothing to do - async forceFlush() {} -} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/MetricReader.ts deleted file mode 100644 index 9f20299a4f..0000000000 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/MetricReader.ts +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { MetricExporter } from '.'; - -// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader - -export class MetricReader { - private _shutdown = false; - - constructor(private _exporter: MetricExporter) {} - - async shutdown(): Promise { - if (this._shutdown) { - return; - } - - this._shutdown = true; - // errors thrown to caller - await this._exporter.shutdown(); - } - - async forceFlush(): Promise { - if (this._shutdown) { - return; - } - - // errors thrown to caller - await this._exporter.forceFlush(); - } -} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts new file mode 100644 index 0000000000..697f020b28 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricExporter.ts @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { AggregationTemporality } from './AggregationTemporality'; +import { MetricData } from './MetricData'; + + +// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricexporter + +// TODO should this just be an interface and exporters can implement their own shutdown? +export abstract class MetricExporter { + protected _shutdown = false; + + abstract export(batch: MetricData[]): Promise; + + abstract forceFlush(): Promise; + + abstract getPreferredAggregationTemporality(): AggregationTemporality; + + async shutdown(): Promise { + if (this._shutdown) { + return; + } + + // Setting _shutdown before flushing might prevent some exporters from flushing + // Waiting until flushing is complete might allow another flush to occur during shutdown + const flushPromise = this.forceFlush(); + this._shutdown = true; + await flushPromise; + } + + isShutdown() { + return this._shutdown; + } +} + +export class ConsoleMetricExporter extends MetricExporter { + async export(_batch: MetricData[]) { + throw new Error('Method not implemented'); + } + + getPreferredAggregationTemporality() { + return AggregationTemporality.CUMULATIVE; + } + + // nothing to do + async forceFlush() {} +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts new file mode 100644 index 0000000000..7afb9eb4ea --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricProducer.ts @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { MetricData } from './MetricData'; + +/** + * This is a public interface that represent an export state of a MetricReader. + */ +export interface MetricProducer { + collect(): Promise; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts new file mode 100644 index 0000000000..2d4cd5c0a7 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -0,0 +1,65 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { AggregationTemporality } from './AggregationTemporality'; +import { MetricExporter } from './MetricExporter'; +import { MetricProducer } from './MetricProducer'; + +// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader + +export abstract class MetricReader { + private _shutdown = false; + private _metricProducer?: MetricProducer; + + constructor(private _exporter: MetricExporter) {} + + setMetricProducer(metricProducer: MetricProducer) { + this._metricProducer = metricProducer; + } + + getPreferredAggregationTemporality(): AggregationTemporality { + return this._exporter.getPreferredAggregationTemporality(); + } + + async collect(): Promise { + if (this._metricProducer === undefined) { + throw new Error('MetricReader is not bound to a MeterProvider'); + } + const metrics = await this._metricProducer.collect(); + + // errors thrown to caller + await this._exporter.export(metrics); + } + + async shutdown(): Promise { + if (this._shutdown) { + return; + } + + this._shutdown = true; + // errors thrown to caller + await this._exporter.shutdown(); + } + + async forceFlush(): Promise { + if (this._shutdown) { + return; + } + + // errors thrown to caller + await this._exporter.forceFlush(); + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts index ef6f9b17d2..35622792a2 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts @@ -15,5 +15,5 @@ */ export { MeterProvider, MeterProviderOptions } from './MeterProvider'; -export * from './MetricExporter'; -export * from './MetricReader'; +export * from './export/MetricExporter'; +export * from './export/MetricReader'; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts new file mode 100644 index 0000000000..0e1325ee71 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Context } from '@opentelemetry/api'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import { Maybe } from '../utils'; +import { Accumulation, Aggregator } from '../aggregator/types'; +import { AttributeHashMap } from './HashMap'; + +/** + * Internal interface. + * + * Allows synchronous collection of metrics. This processor should allow + * allocation of new aggregation cells for metrics and convert cumulative + * recording to delta data points. + */ +export class DeltaMetricProcessor> { + private _activeCollectionStorage: AttributeHashMap; + + constructor(private _aggregator: Aggregator) { + this._activeCollectionStorage = new AttributeHashMap(); + } + + /** Bind an efficient storage handle for a set of attributes. */ + private bind(attributes: Attributes) { + return this._activeCollectionStorage.getOrDefault(attributes, () => this._aggregator.createAccumulation()); + } + + record(value: number, attributes: Attributes, _context: Context) { + const accumulation = this.bind(attributes); + accumulation?.record(value); + } + + collect() { + const unreportedDelta = this._activeCollectionStorage; + this._activeCollectionStorage = new AttributeHashMap(); + return unreportedDelta; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/HashMap.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/HashMap.ts new file mode 100644 index 0000000000..fb8bd4a85a --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/HashMap.ts @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import { hashAttributes } from '../utils'; + +export interface Hash { + (value: ValueType): HashCodeType; +} + +export class HashMap { + private _valueMap = new Map(); + private _keyMap = new Map(); + + constructor(private _hash: Hash) {} + + get(key: KeyType, hashCode?: HashCodeType) { + hashCode ??= this._hash(key); + return this._valueMap.get(hashCode); + } + + getOrDefault(key: KeyType, defaultFactory: () => ValueType) { + const hash = this._hash(key); + if (this._valueMap.has(hash)) { + return this._valueMap.get(hash); + } + const val = defaultFactory(); + if (!this._keyMap.has(hash)) { + this._keyMap.set(hash, key); + } + this._valueMap.set(hash, val); + return val; + } + + set(key: KeyType, value: ValueType, hashCode?: HashCodeType) { + hashCode ??= this._hash(key); + if (!this._keyMap.has(hashCode)) { + this._keyMap.set(hashCode, key); + } + this._valueMap.set(hashCode, value); + } + + *entries(): IterableIterator<[KeyType, ValueType, HashCodeType]> { + const valueIterator = this._valueMap.entries(); + let next = valueIterator.next(); + while (next.done !== true) { + /** next.value[0] here can not be undefined */ + yield [ this._keyMap.get(next.value[0])!, next.value[1], next.value[0]]; + next = valueIterator.next(); + } + } + + get size() { + return this._valueMap.size; + } +} + +export class AttributeHashMap extends HashMap { + constructor() { + super(hashAttributes); + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts index 57a2b022b0..2f59eade1f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts @@ -14,14 +14,23 @@ * limitations under the License. */ +import { HrTime } from '@opentelemetry/api'; +import { hrTime } from '@opentelemetry/core'; import { Resource } from '@opentelemetry/resources'; +import { Meter } from '../Meter'; import { ViewRegistry } from '../view/ViewRegistry'; +import { MetricCollector } from './MetricCollector'; /** * An internal record for shared meter provider states. */ export class MeterProviderSharedState { viewRegistry = new ViewRegistry(); + readonly sdkStartTime: HrTime = hrTime(); + + metricCollectors: MetricCollector[] = []; + + meters: Map = new Map(); constructor(public resource: Resource) {} } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts new file mode 100644 index 0000000000..c1bea5e369 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { hrTime } from '@opentelemetry/core'; +import { AggregationTemporality } from '../export/AggregationTemporality'; +import { MetricData } from '../export/MetricData'; +import { MetricProducer } from '../export/MetricProducer'; +import { MetricReader } from '../export/MetricReader'; +import { MeterProviderSharedState } from './MeterProviderSharedState'; + +/** + * An internal opaque interface that the MetricReader receives as + * MetricProducer. It acts as the storage key to the internal metric stream + * state for each MetricReader. + */ +export class MetricCollector implements MetricProducer { + public readonly aggregatorTemporality: AggregationTemporality; + constructor(private _sharedState: MeterProviderSharedState, private _metricReader: MetricReader) { + this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality(); + } + + async collect(): Promise { + const collectionTime = hrTime(); + const results = await Promise.all(Array.from(this._sharedState.meters.values()) + .map(meter => meter.collect(this, collectionTime))); + + return results.reduce((cumulation, current) => cumulation.concat(current), []); + } + + /** + * Delegates for MetricReader.forceFlush. + */ + async forceFlush(): Promise { + return this._metricReader.forceFlush(); + } + + /** + * Delegates for MetricReader.shutdown. + */ + async shutdown(): Promise { + return this._metricReader.shutdown(); + } +} + +/** + * An internal interface for MetricCollector. Exposes the necessary + * information for metric collection. + */ +export interface MetricCollectorHandle { + aggregatorTemporality: AggregationTemporality; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts new file mode 100644 index 0000000000..7369800bd0 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { MetricData } from '../export/MetricData'; +import { Maybe } from '../utils'; +import { MetricCollectorHandle } from './MetricCollector'; + +/** + * Internal interface. + * + * Represents a storage from which we can collect metrics. + */ +export interface MetricStorage { + /** + * Collects the metrics from this storage. + * + * Note: This is a stateful operation and may reset any interval-related + * state for the MetricCollector. + */ + collect( + collector: MetricCollectorHandle, + collectors: MetricCollectorHandle[], + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + sdkStartTime: HrTime, + collectionTime: HrTime, + ): Promise>; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts index 3dc76048dc..50e3dbc103 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts @@ -18,6 +18,9 @@ import { Context } from '@opentelemetry/api'; import { Attributes } from '@opentelemetry/api-metrics-wip'; import { WritableMetricStorage } from './WritableMetricStorage'; +/** + * Internal interface. + */ export class MultiMetricStorage implements WritableMetricStorage { constructor(private readonly _backingStorages: WritableMetricStorage[]) {} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts new file mode 100644 index 0000000000..cdc1e72b1a --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Context, HrTime } from '@opentelemetry/api'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import { WritableMetricStorage } from './WritableMetricStorage'; +import { Accumulation, Aggregator } from '../aggregator/types'; +import { View } from '../view/View'; +import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor'; +import { AttributesProcessor } from '../view/AttributesProcessor'; +import { MetricStorage } from './MetricStorage'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { Resource } from '@opentelemetry/resources'; +import { MetricData } from '../export/MetricData'; +import { DeltaMetricProcessor } from './DeltaMetricProcessor'; +import { TemporalMetricProcessor } from './TemporalMetricProcessor'; +import { Maybe } from '../utils'; +import { MetricCollectorHandle } from './MetricCollector'; + +/** + * Internal interface. + * + * Stores and aggregates {@link MetricData} for synchronous instruments. + */ +export class SyncMetricStorage> implements WritableMetricStorage, MetricStorage { + private _deltaMetricStorage: DeltaMetricProcessor; + private _temporalMetricStorage: TemporalMetricProcessor; + + constructor(private _instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, private _attributesProcessor: AttributesProcessor) { + this._deltaMetricStorage = new DeltaMetricProcessor(aggregator); + this._temporalMetricStorage = new TemporalMetricProcessor(aggregator); + } + + record(value: number, attributes: Attributes, context: Context) { + attributes = this._attributesProcessor.process(attributes, context); + this._deltaMetricStorage.record(value, attributes, context); + } + + /** + * Collects the metrics from this storage. + * + * Note: This is a stateful operation and may reset any interval-related + * state for the MetricCollector. + */ + async collect( + collector: MetricCollectorHandle, + collectors: MetricCollectorHandle[], + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + sdkStartTime: HrTime, + collectionTime: HrTime, + ): Promise> { + const accumulations = this._deltaMetricStorage.collect(); + + return this._temporalMetricStorage.buildMetrics( + collector, + collectors, + resource, + instrumentationLibrary, + this._instrumentDescriptor, + accumulations, + sdkStartTime, + collectionTime + ); + } + + static create(view: View, instrument: InstrumentDescriptor): SyncMetricStorage> { + instrument = createInstrumentDescriptorWithView(view, instrument); + const aggregator = view.aggregation.createAggregator(instrument); + return new SyncMetricStorage(instrument, aggregator, view.attributesProcessor); + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts new file mode 100644 index 0000000000..f7154469dd --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts @@ -0,0 +1,166 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import { AccumulationRecord, Aggregator } from '../aggregator/types'; +import { MetricData } from '../export/MetricData'; +import { Resource } from '@opentelemetry/resources'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { InstrumentDescriptor } from '../InstrumentDescriptor'; +import { AggregationTemporality } from '../export/AggregationTemporality'; +import { Maybe } from '../utils'; +import { MetricCollectorHandle } from './MetricCollector'; +import { AttributeHashMap } from './HashMap'; + +/** + * Remembers what was presented to a specific exporter. + */ +interface LastReportedHistory { + /** + * The last accumulation of metric data. + */ + accumulations: AttributeHashMap; + /** + * The timestamp the data was reported. + */ + collectionTime: HrTime; +} + +/** + * Internal interface. + * + * Provides unique reporting for each collectors. Allows synchronous collection + * of metrics and reports given temporality values. + */ +export class TemporalMetricProcessor { + private _unreportedAccumulations = new Map[]>(); + private _reportHistory = new Map>(); + + constructor(private _aggregator: Aggregator) {} + + /** + * Builds the {@link MetricData} streams to report against a specific MetricCollector. + * @param collector The information of the MetricCollector. + * @param collectors The registered collectors. + * @param resource The resource to attach these metrics against. + * @param instrumentationLibrary The instrumentation library that generated these metrics. + * @param instrumentDescriptor The instrumentation descriptor that these metrics generated with. + * @param currentAccumulations The current accumulation of metric data from instruments. + * @param sdkStartTime The sdk start timestamp. + * @param collectionTime The current collection timestamp. + * @returns The {@link MetricData} points or {@code null}. + */ + buildMetrics( + collector: MetricCollectorHandle, + collectors: MetricCollectorHandle[], + resource: Resource, + instrumentationLibrary: InstrumentationLibrary, + instrumentDescriptor: InstrumentDescriptor, + currentAccumulations: AttributeHashMap, + sdkStartTime: HrTime, + collectionTime: HrTime, + ): Maybe { + const aggregationTemporality = collector.aggregatorTemporality; + // In case it's our first collection, default to start timestamp (see below for explanation). + let lastCollectionTime = sdkStartTime; + + this._stashAccumulations(collectors, currentAccumulations); + const unreportedAccumulations = this._getMergedUnreportedAccumulations(collector); + + let result = unreportedAccumulations; + // Check our last report time. + if (this._reportHistory.has(collector)) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const last = this._reportHistory.get(collector)!; + lastCollectionTime = last.collectionTime; + + // Use aggregation temporality + instrument to determine if we do a merge or a diff of + // previous. We have the following four scenarios: + // 1. Cumulative Aggregation (temporality) + Delta recording (sync instrument). + // Here we merge with our last record to get a cumulative aggregation. + // 2. Cumulative Aggregation + Cumulative recording - do nothing + // 3. Delta Aggregation + Delta recording - do nothing. + // 4. Delta Aggregation + Cumulative recording (async instrument) - do nothing + if (aggregationTemporality === AggregationTemporality.CUMULATIVE) { + // We need to make sure the current delta recording gets merged into the previous cumulative + // for the next cumulative measurement. + result = TemporalMetricProcessor.merge(last.accumulations, unreportedAccumulations, this._aggregator); + } + } + + // Update last reported (cumulative) accumulation. + this._reportHistory.set(collector, { + accumulations: result, + collectionTime, + }); + + // Metric data time span is determined in Aggregator.toMetricData with aggregation temporality: + // 1. Cumulative Aggregation time span: (sdkStartTime, collectionTime] + // 2. Delta Aggregation time span: (lastCollectionTime, collectionTime] + return this._aggregator.toMetricData( + resource, + instrumentationLibrary, + instrumentDescriptor, + AttributesMapToAccumulationRecords(result), + aggregationTemporality, + sdkStartTime, + lastCollectionTime, + collectionTime); + } + + private _stashAccumulations(collectors: MetricCollectorHandle[], currentAccumulation: AttributeHashMap) { + collectors.forEach(it => { + let stash = this._unreportedAccumulations.get(it); + if (stash === undefined) { + stash = []; + this._unreportedAccumulations.set(it, stash); + } + stash.push(currentAccumulation); + }); + } + + private _getMergedUnreportedAccumulations(collector: MetricCollectorHandle) { + let result = new AttributeHashMap(); + const unreportedList = this._unreportedAccumulations.get(collector); + this._unreportedAccumulations.set(collector, []); + if (unreportedList === undefined) { + return result; + } + for (const it of unreportedList) { + result = TemporalMetricProcessor.merge(result, it, this._aggregator); + } + return result; + } + + static merge(last: AttributeHashMap, current: AttributeHashMap, aggregator: Aggregator) { + const result = last; + const iterator = current.entries(); + let next = iterator.next(); + while (next.done !== true) { + const [key, record, hash] = next.value; + const lastAccumulation = last.get(key, hash) ?? aggregator.createAccumulation(); + result.set(key, aggregator.merge(lastAccumulation, record), hash); + + next = iterator.next(); + } + return result; + } +} + +// TypeScript complains about converting 3 elements tuple to AccumulationRecord. +function AttributesMapToAccumulationRecords(map: AttributeHashMap): AccumulationRecord[] { + return Array.from(map.entries()) as unknown as AccumulationRecord[]; +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts index b72ecb6637..4233dedd24 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts @@ -17,7 +17,13 @@ import { Context } from '@opentelemetry/api'; import { Attributes } from '@opentelemetry/api-metrics-wip'; +/** + * Internal interface. + * + * Stores {@link MetricData} and allows synchronous writes of measurements. + */ export interface WritableMetricStorage { + /** Records a measurement. */ record(value: number, attributes: Attributes, context: Context): void; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index 9adff043d4..7f269755f2 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -14,4 +14,27 @@ * limitations under the License. */ +import { Attributes } from '@opentelemetry/api-metrics-wip'; + export type Maybe = T | undefined; + +export function isNotNullish(item: Maybe): item is T { + return item !== undefined && item !== null; +} + +/** + * Converting the unordered attributes into unique identifier string. + * @param attributes user provided unordered Attributes. + */ +export function hashAttributes(attributes: Attributes): string { + let keys = Object.keys(attributes); + if (keys.length === 0) return ''; + + keys = keys.sort(); + return keys.reduce((result, key) => { + if (result.length > 2) { + result += ','; + } + return (result += key + ':' + attributes[key]); + }, '|#'); +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/Meter.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/Meter.test.ts new file mode 100644 index 0000000000..785d0b5635 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/Meter.test.ts @@ -0,0 +1,47 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { Counter, Histogram, UpDownCounter } from '../src/Instruments'; +import { Meter } from '../src/Meter'; +import { MeterProviderSharedState } from '../src/state/MeterProviderSharedState'; +import { defaultInstrumentationLibrary, defaultResource } from './util'; + +describe('Meter', () => { + describe('createCounter', () => { + it('should create counter', () => { + const meter = new Meter(new MeterProviderSharedState(defaultResource), defaultInstrumentationLibrary); + const counter = meter.createCounter('foobar'); + assert(counter instanceof Counter); + }); + }); + + describe('createUpDownCounter', () => { + it('should create up down counter', () => { + const meter = new Meter(new MeterProviderSharedState(defaultResource), defaultInstrumentationLibrary); + const counter = meter.createUpDownCounter('foobar'); + assert(counter instanceof UpDownCounter); + }); + }); + + describe('createHistogram', () => { + it('should create histogram', () => { + const meter = new Meter(new MeterProviderSharedState(defaultResource), defaultInstrumentationLibrary); + const counter = meter.createHistogram('foobar'); + assert(counter instanceof Histogram); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricProcessor.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricProcessor.test.ts new file mode 100644 index 0000000000..8d606d64b9 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricProcessor.test.ts @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import * as assert from 'assert'; +import { DropAggregator, SumAggregator } from '../../src/aggregator'; +import { DeltaMetricProcessor } from '../../src/state/DeltaMetricProcessor'; +import { commonAttributes, commonValues } from '../util'; + +describe('DeltaMetricProcessor', () => { + describe('record', () => { + it('no exceptions on record with DropAggregator', () => { + const metricStorage = new DeltaMetricProcessor(new DropAggregator()); + + for (const value of commonValues) { + for (const attributes of commonAttributes) { + metricStorage.record(value, attributes, api.context.active()); + } + } + }); + + it('no exceptions on record with no-drop aggregator', () => { + const metricStorage = new DeltaMetricProcessor(new SumAggregator()); + + for (const value of commonValues) { + for (const attributes of commonAttributes) { + metricStorage.record(value, attributes, api.context.active()); + } + } + }); + }); + + describe('collect', () => { + it('should export', () => { + const metricStorage = new DeltaMetricProcessor(new SumAggregator()); + + metricStorage.record(1, { attribute: '1' }, api.ROOT_CONTEXT); + metricStorage.record(2, { attribute: '1' }, api.ROOT_CONTEXT); + metricStorage.record(1, { attribute: '2' }, api.ROOT_CONTEXT); + + let accumulations = metricStorage.collect(); + assert.strictEqual(accumulations.size, 2); + { + const accumulation = accumulations.get({ attribute: '1' }); + assert.strictEqual(accumulation?.toPoint(), 3); + } + { + const accumulation = accumulations.get({ attribute: '2' }); + assert.strictEqual(accumulation?.toPoint(), 1); + } + + /** the accumulations shall be reset. */ + accumulations = metricStorage.collect(); + assert.strictEqual(accumulations.size, 0); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/HashMap.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/HashMap.test.ts new file mode 100644 index 0000000000..84224f3d94 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/HashMap.test.ts @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import { HashMap } from '../../src/state/HashMap'; +import { hashAttributes } from '../../src/utils'; + +describe('HashMap', () => { + describe('set & get', () => { + it('should get and set with attributes', () => { + const map = new HashMap(hashAttributes); + const hash = hashAttributes({ foo: 'bar' }); + + map.set({ foo: 'bar' }, 1); + // get with pinned hash code + assert.strictEqual(map.get({}, hash), 1); + // get with attributes object. + assert.strictEqual(map.get({ foo: 'bar' }), 1); + + map.set({}, 2, hash); + // get with pinned hash code + assert.strictEqual(map.get({}, hash), 2); + // get with attributes object. + assert.strictEqual(map.get({ foo: 'bar' }), 2); + }); + }); + + describe('entries', () => { + it('iterating with entries', () => { + const map = new HashMap(hashAttributes); + map.set({ foo: '1' }, 1); + map.set({ foo: '2' }, 2); + map.set({ foo: '3' }, 3); + map.set({ foo: '4' }, 4); + + const entries = Array.from(map.entries()); + assert.deepStrictEqual(entries, [ + [{ foo: '1' }, 1, hashAttributes({ foo: '1' })], + [{ foo: '2' }, 2, hashAttributes({ foo: '2' })], + [{ foo: '3' }, 3, hashAttributes({ foo: '3' })], + [{ foo: '4' }, 4, hashAttributes({ foo: '4' })], + ]); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts new file mode 100644 index 0000000000..900225cdf3 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -0,0 +1,122 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import { AggregationTemporality } from '../../src/export/AggregationTemporality'; +import { MetricData, PointDataType } from '../../src/export/MetricData'; +import { MetricExporter } from '../../src/export/MetricExporter'; +import { MetricReader } from '../../src/export/MetricReader'; +import { Meter } from '../../src/Meter'; +import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState'; +import { MetricCollector } from '../../src/state/MetricCollector'; +import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util'; + +class TestMetricExporter extends MetricExporter { + metricDataList: MetricData[] = [] + async export(batch: MetricData[]): Promise { + this.metricDataList.push(...batch); + } + + async forceFlush(): Promise {} + + getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.CUMULATIVE; + } +} + +class TestDeltaMetricExporter extends TestMetricExporter { + override getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.DELTA; + } +} + +class TestMetricReader extends MetricReader { + getMetricCollector(): MetricCollector { + return this['_metricProducer'] as MetricCollector; + } +} + +describe('MetricCollector', () => { + afterEach(() => { + sinon.restore(); + }) + + describe('constructor', () => { + it('should construct MetricCollector without exceptions', () => { + const meterProviderSharedState = new MeterProviderSharedState(defaultResource); + const exporters = [ new TestMetricExporter(), new TestDeltaMetricExporter() ]; + for (const exporter of exporters) { + const reader = new TestMetricReader(exporter); + const metricCollector = new MetricCollector(meterProviderSharedState, reader); + + assert.strictEqual(metricCollector.aggregatorTemporality, exporter.getPreferredAggregationTemporality()); + } + }); + }); + + describe('collect', () => { + function setupInstruments(exporter: MetricExporter) { + // TODO(legendecas): setup with MeterProvider when meter identity was settled. + const meterProviderSharedState = new MeterProviderSharedState(defaultResource); + + const reader = new TestMetricReader(exporter); + const metricCollector = new MetricCollector(meterProviderSharedState, reader); + meterProviderSharedState.metricCollectors.push(metricCollector); + + const meter = new Meter(meterProviderSharedState, defaultInstrumentationLibrary); + meterProviderSharedState.meters.set('test-meter', meter); + + return { metricCollector, meter }; + } + + it('should collect metrics', async () => { + /** preparing test instrumentations */ + const exporter = new TestMetricExporter(); + const { metricCollector, meter } = setupInstruments(exporter); + + /** creating metric events */ + const counter = meter.createCounter('counter1'); + counter.add(1, {}); + counter.add(2, { foo: 'bar' }); + + const counter2 = meter.createCounter('counter2'); + counter2.add(3); + + /** collect metrics */ + const batch = await metricCollector.collect(); + assert(Array.isArray(batch)); + assert.strictEqual(batch.length, 2); + + /** checking batch[0] */ + const metricData1 = batch[0]; + assertMetricData(metricData1, PointDataType.SINGULAR, { + name: 'counter1' + }, defaultInstrumentationLibrary); + assert.strictEqual(metricData1.pointData.length, 2); + assertPointData(metricData1.pointData[0], {}, 1); + assertPointData(metricData1.pointData[1], { foo: 'bar' }, 2); + + /** checking batch[1] */ + const metricData2 = batch[1]; + assertMetricData(metricData2, PointDataType.SINGULAR, { + name: 'counter2' + }, defaultInstrumentationLibrary); + assert.strictEqual(metricData2.pointData.length, 1); + assertPointData(metricData2.pointData[0], {}, 3); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts new file mode 100644 index 0000000000..64c8daaa6f --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { Attributes } from '@opentelemetry/api-metrics-wip'; +import * as assert from 'assert'; +import { Measurement } from '../../src/Measurement'; +import { MultiMetricStorage } from '../../src/state/MultiWritableMetricStorage'; +import { WritableMetricStorage } from '../../src/state/WritableMetricStorage'; +import { assertMeasurementEqual, commonAttributes, commonValues } from '../util'; + +describe('MultiMetricStorage', () => { + describe('record', () => { + it('no exceptions on record', () => { + const metricStorage = new MultiMetricStorage([]); + + for (const value of commonValues) { + for (const attribute of commonAttributes) { + metricStorage.record(value, attribute, api.context.active()); + } + } + }); + + it('record with multiple backing storages', () => { + class TestWritableMetricStorage implements WritableMetricStorage { + records: Measurement[] = []; + record(value: number, attributes: Attributes, context: api.Context): void { + this.records.push({ value, attributes, context }); + } + } + + const backingStorage1 = new TestWritableMetricStorage(); + const backingStorage2 = new TestWritableMetricStorage(); + const metricStorage = new MultiMetricStorage([backingStorage1, backingStorage2]); + + const expectedMeasurements: Measurement[] = []; + for (const value of commonValues) { + for (const attributes of commonAttributes) { + const context = api.context.active() + expectedMeasurements.push({ value, attributes, context }) + metricStorage.record(value, attributes, context); + } + } + + assert.strictEqual(backingStorage1.records.length, expectedMeasurements.length); + assert.strictEqual(backingStorage2.records.length, expectedMeasurements.length); + for (const [idx, expected] of expectedMeasurements.entries()) { + assertMeasurementEqual(backingStorage1.records[idx], expected); + assertMeasurementEqual(backingStorage2.records[idx], expected); + } + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts new file mode 100644 index 0000000000..e782c14876 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts @@ -0,0 +1,158 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { hrTime } from '@opentelemetry/core'; +import * as assert from 'assert'; + +import { SumAggregator } from '../../src/aggregator'; +import { AggregationTemporality } from '../../src/export/AggregationTemporality'; +import { PointDataType } from '../../src/export/MetricData'; +import { MetricCollectorHandle } from '../../src/state/MetricCollector'; +import { SyncMetricStorage } from '../../src/state/SyncMetricStorage'; +import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor'; +import { assertMetricData, assertPointData, commonAttributes, commonValues, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +const deltaCollector: MetricCollectorHandle = { + aggregatorTemporality: AggregationTemporality.DELTA, +}; + +const cumulativeCollector: MetricCollectorHandle = { + aggregatorTemporality: AggregationTemporality.CUMULATIVE, +}; + +const sdkStartTime = hrTime(); + +describe('SyncMetricStorage', () => { + describe('record', () => { + it('no exceptions on record', () => { + const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); + + for (const value of commonValues) { + for (const attributes of commonAttributes) { + metricStorage.record(value, attributes, api.context.active()); + } + } + }); + }); + + describe('collect', () => { + describe('Delta Collector', () => { + const collectors = [deltaCollector]; + it('should collect and reset memos', async () => { + const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); + metricStorage.record(1, {}, api.context.active()); + metricStorage.record(2, {}, api.context.active()); + metricStorage.record(3, {}, api.context.active()); + { + const metric = await metricStorage.collect( + deltaCollector, + collectors, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 6); + } + + // The attributes should not be memorized. + { + const metric = await metricStorage.collect( + deltaCollector, + collectors, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 0); + } + + metricStorage.record(1, {}, api.context.active()); + { + const metric = await metricStorage.collect( + deltaCollector, + [deltaCollector], + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 1); + } + }); + }); + + describe('Cumulative Collector', () => { + const collectors = [cumulativeCollector]; + it('should collect cumulative metrics', async () => { + const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); + metricStorage.record(1, {}, api.context.active()); + metricStorage.record(2, {}, api.context.active()); + metricStorage.record(3, {}, api.context.active()); + { + const metric = await metricStorage.collect( + cumulativeCollector, + collectors, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 6); + } + + // The attributes should be memorized. + { + const metric = await metricStorage.collect( + cumulativeCollector, + collectors, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 6); + } + + metricStorage.record(1, {}, api.context.active()); + { + const metric = await metricStorage.collect( + cumulativeCollector, + collectors, + defaultResource, + defaultInstrumentationLibrary, + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 7); + } + }); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts new file mode 100644 index 0000000000..081ff0d035 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts @@ -0,0 +1,247 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { hrTime } from '@opentelemetry/core'; +import * as assert from 'assert'; +import { SumAggregator } from '../../src/aggregator'; +import { AggregationTemporality } from '../../src/export/AggregationTemporality'; +import { PointDataType } from '../../src/export/MetricData'; +import { DeltaMetricProcessor } from '../../src/state/DeltaMetricProcessor'; +import { MetricCollectorHandle } from '../../src/state/MetricCollector'; +import { TemporalMetricProcessor } from '../../src/state/TemporalMetricProcessor'; +import { assertMetricData, assertPointData, defaultInstrumentationLibrary, defaultInstrumentDescriptor, defaultResource } from '../util'; + +const deltaCollector1: MetricCollectorHandle = { + aggregatorTemporality: AggregationTemporality.DELTA, +}; + +const deltaCollector2: MetricCollectorHandle = { + aggregatorTemporality: AggregationTemporality.DELTA, +}; + +const cumulativeCollector1: MetricCollectorHandle = { + aggregatorTemporality: AggregationTemporality.CUMULATIVE, +}; + +const sdkStartTime = hrTime(); + +describe('TemporalMetricProcessor', () => { + describe('buildMetrics', () => { + describe('single delta collector', () => { + const collectors = [ deltaCollector1 ]; + + it('should build metrics', () => { + const aggregator = new SumAggregator(); + const deltaMetricStorage = new DeltaMetricProcessor(aggregator); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + + deltaMetricStorage.record(1, {}, api.context.active()); + { + const metric = temporalMetricStorage.buildMetrics( + deltaCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 1); + } + + deltaMetricStorage.record(2, {}, api.context.active()); + { + const metric = temporalMetricStorage.buildMetrics( + deltaCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 2); + } + + { + const metric = temporalMetricStorage.buildMetrics( + deltaCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 0); + } + }); + }); + + describe('two delta collector', () => { + const collectors = [ deltaCollector1, deltaCollector2 ]; + + it('should build metrics', () => { + const aggregator = new SumAggregator(); + const deltaMetricStorage = new DeltaMetricProcessor(aggregator); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + + deltaMetricStorage.record(1, {}, api.context.active()); + { + const metric = temporalMetricStorage.buildMetrics( + deltaCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 1); + } + + { + const metric = temporalMetricStorage.buildMetrics( + deltaCollector2, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 1); + } + }); + }); + + describe('single cumulative collector', () => { + const collectors = [ cumulativeCollector1 ]; + it('should build metrics', () => { + const aggregator = new SumAggregator(); + const deltaMetricStorage = new DeltaMetricProcessor(aggregator); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + + deltaMetricStorage.record(1, {}, api.context.active()); + { + const metric = temporalMetricStorage.buildMetrics( + cumulativeCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 1); + } + + deltaMetricStorage.record(2, {}, api.context.active()); + { + const metric = temporalMetricStorage.buildMetrics( + cumulativeCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 3); + } + }); + }); + + describe('cumulative collector with delta collector', () => { + const collectors = [ deltaCollector1, cumulativeCollector1 ]; + it('should build metrics', () => { + const aggregator = new SumAggregator(); + const deltaMetricStorage = new DeltaMetricProcessor(aggregator); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + + deltaMetricStorage.record(1, {}, api.context.active()); + { + const metric = temporalMetricStorage.buildMetrics( + cumulativeCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 1); + } + + deltaMetricStorage.record(2, {}, api.context.active()); + { + const metric = temporalMetricStorage.buildMetrics( + deltaCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 3); + } + { + const metric = temporalMetricStorage.buildMetrics( + cumulativeCollector1, + collectors, + defaultResource, + defaultInstrumentationLibrary, + defaultInstrumentDescriptor, + deltaMetricStorage.collect(), + sdkStartTime, + hrTime()); + + assertMetricData(metric, PointDataType.SINGULAR); + assert.strictEqual(metric.pointData.length, 1); + assertPointData(metric.pointData[0], {}, 3); + } + }); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts index bf56a9d50f..58e8bcbb6c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -17,8 +17,13 @@ import { Attributes, ValueType } from '@opentelemetry/api-metrics'; import { InstrumentationLibrary } from '@opentelemetry/core'; import { Resource } from '@opentelemetry/resources'; +import * as assert from 'assert'; import { InstrumentDescriptor } from '../src/InstrumentDescriptor'; -import { InstrumentType } from '../src/Instruments'; +import { Histogram, InstrumentType } from '../src/Instruments'; +import { MetricData, PointData, PointDataType } from '../src/export/MetricData'; +import { Measurement } from '../src/Measurement'; +import { isNotNullish } from '../src/utils'; +import { HrTime } from '@opentelemetry/api'; export const defaultResource = new Resource({ resourceKey: 'my-resource', @@ -47,3 +52,68 @@ export const sleep = (time: number) => new Promise(resolve => { return setTimeout(resolve, time); }); + +export function assertMetricData( + actual: unknown, + pointDataType?: PointDataType, + instrumentDescriptor: Partial = defaultInstrumentDescriptor, + instrumentationLibrary: Partial = defaultInstrumentationLibrary, + resource: Resource = defaultResource, +): asserts actual is MetricData { + const it = actual as MetricData; + assert.deepStrictEqual(it.resource, resource); + assertPartialDeepStrictEqual(it.instrumentDescriptor, instrumentDescriptor); + assertPartialDeepStrictEqual(it.instrumentationLibrary, instrumentationLibrary); + if (isNotNullish(pointDataType)) { + assert.strictEqual(it.pointDataType, pointDataType); + } else { + assert(isNotNullish(PointDataType[it.pointDataType])); + } + assert(Array.isArray(it.pointData)); +} + +export function assertPointData( + actual: unknown, + attributes: Attributes, + point: Histogram | number, + startTime?: HrTime, + endTime?: HrTime, +): asserts actual is PointData { + const it = actual as PointData; + assert.deepStrictEqual(it.attributes, attributes); + assert.deepStrictEqual(it.point, point); + if (startTime) { + assert.deepStrictEqual(it.startTime, startTime); + } else { + assert(Array.isArray(it.startTime)); + assert.strictEqual(it.startTime.length, 2); + } + if (endTime) { + assert.deepStrictEqual(it.endTime, endTime); + } else { + assert(Array.isArray(it.endTime)); + assert.strictEqual(it.endTime.length, 2); + } +} + +export function assertMeasurementEqual(actual: unknown, expected: Measurement): asserts actual is Measurement { + // NOTE: Node.js v8 assert.strictEquals treat two NaN as different values. + if (Number.isNaN(expected.value)) { + assert(Number.isNaN((actual as Measurement).value)); + } else { + assert.strictEqual((actual as Measurement).value, expected.value); + } + assert.deepStrictEqual((actual as Measurement).attributes, expected.attributes); + assert.deepStrictEqual((actual as Measurement).context, expected.context); +} + +export function assertPartialDeepStrictEqual(actual: unknown, expected: T, message?: string): asserts actual is T { + assert.strictEqual(typeof actual, typeof expected, message); + if (typeof expected !== 'object' && typeof expected !== 'function') { + return; + } + const ownNames = Object.getOwnPropertyNames(expected); + for (const ownName of ownNames) { + assert.deepStrictEqual((actual as any)[ownName], (expected as any)[ownName], message); + } +}