Skip to content

Commit

Permalink
feat(sdk-metrics-base): hoist async instrument callback invocations (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas authored May 6, 2022
1 parent d98548b commit 772e659
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 67 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ All notable changes to experimental packages in this project will be documented
* feat(proto): add @opentelemetry/otlp-transformer package with hand-rolled transformation #2746 @dyladan
* feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890 @legendecas
* feat(sdk-metrics-base): return the same meter for identical input to getMeter #2901 @legendecas
* feat(sdk-metrics-base): hoist async instrument callback invocations #2822 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export interface Meter {

/**
* Creates a new `ObservableGauge` metric.
*
* The callback SHOULD be safe to be invoked concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -92,6 +95,9 @@ export interface Meter {

/**
* Creates a new `ObservableCounter` metric.
*
* The callback SHOULD be safe to be invoked concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -104,6 +110,9 @@ export interface Meter {

/**
* Creates a new `ObservableUpDownCounter` metric.
*
* The callback SHOULD be safe to be invoked concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,38 @@
*/

import { HrTime } from '@opentelemetry/api';
import { ObservableCallback } from '@opentelemetry/api-metrics';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { ObservableResult } from '../ObservableResult';
import { AttributeHashMap } from './HashMap';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
* Internal interface.
*
* Stores and aggregates {@link MetricData} for asynchronous instruments.
*/
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage {
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage implements AsyncWritableMetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _callback: ObservableCallback
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

private _record(measurements: AttributeHashMap<number>) {
record(measurements: AttributeHashMap<number>) {
const processed = new AttributeHashMap<number>();
Array.from(measurements.entries()).forEach(([attributes, value]) => {
processed.set(this._attributesProcessor.process(attributes), value);
Expand All @@ -67,17 +61,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
async collect(
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
const observableResult = new ObservableResult();
// TODO: timeout with callback
await this._callback(observableResult);
this._record(observableResult.buffer);

): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
Expand All @@ -89,10 +78,4 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
collectionTime
);
}

static create(view: View, instrument: InstrumentDescriptor, callback: ObservableCallback): AsyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
return new AsyncMetricStorage(instrument, aggregator, view.attributesProcessor, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import { MeterProviderSharedState } from './MeterProviderSharedState';
import { MetricCollectorHandle } from './MetricCollector';
import { MetricStorageRegistry } from './MetricStorageRegistry';
import { MultiMetricStorage } from './MultiWritableMetricStorage';
import { ObservableRegistry } from './ObservableRegistry';
import { SyncMetricStorage } from './SyncMetricStorage';

/**
* An internal record for shared meter provider states.
*/
export class MeterSharedState {
private _metricStorageRegistry = new MetricStorageRegistry();
private _observableRegistry = new ObservableRegistry();
meter: Meter;

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
Expand Down Expand Up @@ -60,8 +62,12 @@ export class MeterSharedState {
views.forEach(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback);
this._metricStorageRegistry.register(viewStorage);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor);
const storage = this._metricStorageRegistry.register(viewStorage);
if (storage == null) {
return;
}
this._observableRegistry.addCallback(callback, storage);
});
}

Expand All @@ -75,15 +81,16 @@ export class MeterSharedState {
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
const metricDataList = await Promise.all(Array.from(this._metricStorageRegistry.getStorages())
await this._observableRegistry.observe();
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
})
.filter(isNotNullish));
.filter(isNotNullish);

return {
instrumentationLibrary: this._instrumentationLibrary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export abstract class MetricStorage {
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;
): Maybe<MetricData>;

getInstrumentDescriptor(): InstrumentDescriptor{
return this._instrumentDescriptor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableResult } from '../ObservableResult';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
* An internal state interface for ObservableCallbacks.
*
* An ObservableCallback can be bound to multiple AsyncMetricStorage at once
* for batch observations. And an AsyncMetricStorage may be bound to multiple
* callbacks too.
*
* However an ObservableCallback must not be called multiple times during a
* single collection operation.
*/
export class ObservableRegistry {
private _callbacks: [ObservableCallback, AsyncWritableMetricStorage][] = [];

addCallback(callback: ObservableCallback, metricStorage: AsyncWritableMetricStorage) {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
metricStorage.record(observableResult.buffer);
})
);

await promise;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ import { Context, HrTime } from '@opentelemetry/api';
import { MetricAttributes } from '@opentelemetry/api-metrics';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
Expand Down Expand Up @@ -61,12 +57,12 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
async collect(
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
Expand All @@ -78,10 +74,4 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
collectionTime
);
}

static create(view: View, instrument: InstrumentDescriptor): SyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
return new SyncMetricStorage(instrument, aggregator, view.attributesProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@

import { Context } from '@opentelemetry/api';
import { MetricAttributes } from '@opentelemetry/api-metrics';
import { AttributeHashMap } from './HashMap';

/**
* Internal interface.
* Internal interface. Stores measurements and allows synchronous writes of
* measurements.
*
* Stores {@link MetricData} and allows synchronous writes of measurements.
* An interface representing SyncMetricStorage with type parameters removed.
*/
export interface WritableMetricStorage {
/** Records a measurement. */
record(value: number, attributes: MetricAttributes, context: Context): void;
}

export class NoopWritableMetricStorage implements WritableMetricStorage {
record(_value: number, _attributes: MetricAttributes, _context: Context): void {}
/**
* Internal interface. Stores measurements and allows asynchronous writes of
* measurements.
*
* An interface representing AsyncMetricStorage with type parameters removed.
*/
export interface AsyncWritableMetricStorage {
/** Records a batch of measurements. */
record(measurements: AttributeHashMap<number>): void;
}
Loading

0 comments on commit 772e659

Please sign in to comment.