Skip to content

Commit

Permalink
feat: bootstrap metric streams
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Nov 22, 2021
1 parent a80701f commit da450e4
Show file tree
Hide file tree
Showing 16 changed files with 678 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import { MetricOptions, ValueType } from '@opentelemetry/api-metrics-wip';
import { InstrumentType } from './Instruments';
import { View } from './view/View';


export interface InstrumentDescriptor {
readonly name: string;
Expand All @@ -34,3 +36,13 @@ export function createInstrumentDescriptor(name: string, type: InstrumentType, o
valueType: options?.valueType ?? ValueType.DOUBLE,
};
}

export function createInstrumentDescriptorWithView(view: View, instrument: InstrumentDescriptor): InstrumentDescriptor {
return {
name: view.name ?? instrument.name,
description: view.description ?? instrument.description,
type: instrument.type,
unit: instrument.unit,
valueType: instrument.valueType,
};
}
28 changes: 22 additions & 6 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
*/

import * as metrics from '@opentelemetry/api-metrics-wip';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { hrTime, InstrumentationLibrary } from '@opentelemetry/core';
import { createInstrumentDescriptor, InstrumentDescriptor } from './InstrumentDescriptor';
import { Counter, Histogram, InstrumentType, UpDownCounter } from './Instruments';
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
import { NoopWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage';
import { SyncMetricStorage } from './state/SyncMetricStorage';
import { MetricStorage } from './state/MetricStorage';
import { CollectorInfo } from './export/CollectorInfo';
import { MetricData } from './export/MetricData';
import { isNotNullish } from './utils';

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#meter

export class Meter implements metrics.Meter {
private _metricStorageRegistry = new Map<string, WritableMetricStorage>();
private _metricStorageRegistry = new Map<string, MetricStorage>();

// instrumentation library required by spec to be on meter
// spec requires provider config changes to apply to previously created meters, achieved by holding a reference to the provider
Expand Down Expand Up @@ -68,9 +72,8 @@ export class Meter implements metrics.Meter {

private _registerMetricStorage(descriptor: InstrumentDescriptor) {
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
const storages = views.map(_view => {
// TODO: create actual metric storages.
const storage = new NoopWritableMetricStorage();
const storages = views.map(view => {
const storage = SyncMetricStorage.create(view, descriptor);
// TODO: handle conflicts
this._metricStorageRegistry.set(descriptor.name, storage);
return storage;
Expand All @@ -80,4 +83,17 @@ export class Meter implements metrics.Meter {
}
return new MultiMetricStorage(storages);
}

async collectAll(collectorInfo: CollectorInfo): Promise<MetricData[]> {
const collectionTime = hrTime();
const result = await Promise.all(Array.from(this._metricStorageRegistry.values()).map(metricStorage => {
return metricStorage.collectAndReset(
collectorInfo,
this._meterProviderSharedState.resource,
this._instrumentationLibrary,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
}));
return result.filter(isNotNullish);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { AggregationTemporality } from '@opentelemetry/api-metrics-wip';

/**
* A opaque handle for a collection-pipeline of metrics.
*
* This provides an efficient means of leasing and tracking metric readers.
*/
export type CollectorHandle = symbol;

/**
* An internal record about a {@link MetricReader} used when collecting metrics.
*/
export interface CollectorInfo {
collectorHandle: CollectorHandle;
aggregationTemporality: AggregationTemporality;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Context } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics-wip';
import { hashAttributes, Maybe } from '../utils';
import { Accumulation, AccumulationRecord, Aggregator } from '../aggregator/types';

/**
* Allows synchronous collection of metrics.
*
* This storage should allow allocation of new aggregation cells for metrics and unique reporting
* of per-collection delta accumulations.
*/
export class DeltaMetricStorage<T extends Maybe<Accumulation>> {
private _activeCollectionStorage = new Map<string, AccumulationRecord<T>>();

constructor(private _aggregator: Aggregator<T>) {}

/** Bind an efficient storage handle for a set of attributes. */
private bind(attributes: Attributes) {
const hash = hashAttributes(attributes);
let accumulationRecord: AccumulationRecord<T>;
if (this._activeCollectionStorage.has(hash)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
accumulationRecord = this._activeCollectionStorage.get(hash)!;
} else {
accumulationRecord = [attributes, this._aggregator.createAccumulation()] as AccumulationRecord<T>;
this._activeCollectionStorage.set(hash, accumulationRecord);
}
return accumulationRecord[1];
}

record(value: number, attributes: Attributes, _context: Context) {
const accumulation = this.bind(attributes);
accumulation?.record(value);
}

collect() {
const unreportedDelta = this._activeCollectionStorage;
this._activeCollectionStorage = new Map();
return unreportedDelta;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import { hrTime } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { ViewRegistry } from '../view/ViewRegistry';

Expand All @@ -22,6 +23,10 @@ import { ViewRegistry } from '../view/ViewRegistry';
*/
export class MeterProviderSharedState {
viewRegistry = new ViewRegistry();
// TODO: we should probably Object.freeze here but,
// return type Object.freeze(hrTime()) is `readonly [number, number]` which
// is not assignable to HrTime.
readonly sdkStartTime = hrTime();

constructor(public resource: Resource) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { HrTime } from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { CollectorInfo } from '../export/CollectorInfo';
import { MetricData } from '../export/MetricData';
import { Maybe } from '../utils';

export interface MetricStorage {
/**
* Collects the metrics from this storage and resets for the next
* collection period.
*
* Note: This is a stateful operation and will reset any interval-related
* state for the CollectorInfo.
*/
collectAndReset(
collectorInfo: CollectorInfo,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { Context } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics';
import { Attributes } from '@opentelemetry/api-metrics-wip';
import { WritableMetricStorage } from './WritableMetricStorage';

export class MultiMetricStorage implements WritableMetricStorage {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Context, HrTime } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics-wip';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { CollectorInfo } from '../export/CollectorInfo';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { MetricData } from '../export/MetricData';
import { DeltaMetricStorage } from './DeltaMetricStorage';
import { TemporalMetricStorage } from './TemporalMetricStorage';
import { Maybe } from '../utils';

/**
* Stores and aggregate {@link MetricData} for synchronous instruments.
*/
export class SyncMetricStorage<T extends Maybe<Accumulation>> implements WritableMetricStorage, MetricStorage {
private _deltaMetricStorage: DeltaMetricStorage<T>;
private _temporalMetricStorage: TemporalMetricStorage<T>;

constructor(private _instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator<T>, private _attributesProcessor: AttributesProcessor) {
this._deltaMetricStorage = new DeltaMetricStorage(aggregator);
this._temporalMetricStorage = new TemporalMetricStorage(aggregator);
}

record(value: number, attributes: Attributes, context: Context) {
attributes = this._attributesProcessor.process(attributes, context);
this._deltaMetricStorage.record(value, attributes, context);
}

/**
* Collects the metrics from this storage and resets for the next
* collection period.
*
* Note: This is a stateful operation and will reset any interval-related
* state for the CollectorInfo.
*/
async collectAndReset(
collectorInfo: CollectorInfo,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(collectorInfo, resource, instrumentationLibrary, this._instrumentDescriptor, accumulations, sdkStartTime, collectionTime);
}

static create(view: View, instrument: InstrumentDescriptor): SyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
const storage = new SyncMetricStorage(instrument, aggregator, view.attributesProcessor);
return storage;
}
}
Loading

0 comments on commit da450e4

Please sign in to comment.