Skip to content

Commit

Permalink
Merge branch 'main' into fix/open-telemetry#2671
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarchaud authored Jan 8, 2022
2 parents 42b8409 + 354c002 commit 06c5669
Show file tree
Hide file tree
Showing 22 changed files with 981 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,143 @@
* limitations under the License.
*/

import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricExporter } from './MetricExporter';
import { MetricProducer } from './MetricProducer';
import { MetricData } from './MetricData';
import { callWithTimeout } from '../utils';

export type ReaderOptions = {
timeoutMillis?: number
}

export type ReaderCollectionOptions = ReaderOptions;

export type ReaderShutdownOptions = ReaderOptions;

export type ReaderForceFlushOptions = ReaderOptions;

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader

/**
* A registered reader of metrics that, when linked to a {@link MetricProducer}, offers global
* control over metrics.
*/
export abstract class MetricReader {
// Tracks the shutdown state.
// TODO: use BindOncePromise here once a new version of @opentelemetry/core is available.
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;

constructor(private _exporter: MetricExporter) {}
constructor(private readonly _preferredAggregationTemporality = AggregationTemporality.CUMULATIVE) {
}

/**
* Set the {@link MetricProducer} used by this instance.
*
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
this._metricProducer = metricProducer;
this.onInitialized();
}

/**
* Get the {@link AggregationTemporality} preferred by this {@link MetricReader}
*/
getPreferredAggregationTemporality(): AggregationTemporality {
return this._exporter.getPreferredAggregationTemporality();
return this._preferredAggregationTemporality;
}

/**
* Handle once the SDK has initialized this {@link MetricReader}
* Overriding this method is optional.
*/
protected onInitialized(): void {
// Default implementation is empty.
}

async collect(): Promise<void> {
/**
* Handle a shutdown signal by the SDK.
*
* <p> For push exporters, this should shut down any intervals and close any open connections.
* @protected
*/
protected abstract onShutdown(): Promise<void>;

/**
* Handle a force flush signal by the SDK.
*
* <p> In all scenarios metrics should be collected via {@link collect()}.
* <p> For push exporters, this should collect and report metrics.
* @protected
*/
protected abstract onForceFlush(): Promise<void>;

/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: ReaderCollectionOptions): Promise<MetricData[]> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MeterProvider');
throw new Error('MetricReader is not bound to a MetricProducer');
}
const metrics = await this._metricProducer.collect();

// errors thrown to caller
await this._exporter.export(metrics);
// Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls.
if (this._shutdown) {
api.diag.warn('Collection is not allowed after shutdown');
return [];
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
}

async shutdown(): Promise<void> {
/**
* Shuts down the metric reader, the promise will reject after the optional timeout or resolve after completion.
*
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async shutdown(options?: ReaderShutdownOptions): Promise<void> {
// Do not call shutdown again if it has already been called.
if (this._shutdown) {
api.diag.error('Cannot call shutdown twice.');
return;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
await this.onShutdown();
} else {
await callWithTimeout(this.onShutdown(), options.timeoutMillis);
}

this._shutdown = true;
// errors thrown to caller
await this._exporter.shutdown();
}

async forceFlush(): Promise<void> {
/**
* Flushes metrics read by this reader, the promise will reject after the optional timeout or resolve after completion.
*
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async forceFlush(options?: ReaderForceFlushOptions): Promise<void> {
if (this._shutdown) {
api.diag.warn('Cannot forceFlush on already shutdown MetricReader.');
return;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
await this.onForceFlush();
return;
}

// errors thrown to caller
await this._exporter.forceFlush();
await callWithTimeout(this.onForceFlush(), options.timeoutMillis);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as api from '@opentelemetry/api';
import { MetricReader } from './MetricReader';
import { MetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';

export type PeriodicExportingMetricReaderOptions = {
exporter: MetricExporter
exportIntervalMillis?: number,
exportTimeoutMillis?: number
}

/**
* {@link MetricReader} which collects metrics based on a user-configurable time interval, and passes the metrics to
* the configured {@link MetricExporter}
*/
export class PeriodicExportingMetricReader extends MetricReader {
private _interval?: ReturnType<typeof setInterval>;

private _exporter: MetricExporter;

private readonly _exportInterval: number;

private readonly _exportTimeout: number;

constructor(options: PeriodicExportingMetricReaderOptions) {
super(options.exporter.getPreferredAggregationTemporality());

if (options.exportIntervalMillis !== undefined && options.exportIntervalMillis <= 0) {
throw Error('exportIntervalMillis must be greater than 0');
}

if (options.exportTimeoutMillis !== undefined && options.exportTimeoutMillis <= 0) {
throw Error('exportTimeoutMillis must be greater than 0');
}

if (options.exportTimeoutMillis !== undefined &&
options.exportIntervalMillis !== undefined &&
options.exportIntervalMillis < options.exportTimeoutMillis) {
throw Error('exportIntervalMillis must be greater than or equal to exportTimeoutMillis');
}

this._exportInterval = options.exportIntervalMillis ?? 60000;
this._exportTimeout = options.exportTimeoutMillis ?? 30000;
this._exporter = options.exporter;
}

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});
await this._exporter.export(metrics);
}

protected override onInitialized(): void {
// start running the interval as soon as this reader is initialized and keep handle for shutdown.
this._interval = setInterval(async () => {
try {
await callWithTimeout(this._runOnce(), this._exportTimeout);
} catch (err) {
if (err instanceof TimeoutError) {
api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout);
return;
}

api.diag.error('Unexpected error during export: %s', err);
}
}, this._exportInterval);
}

protected async onForceFlush(): Promise<void> {
await this._exporter.forceFlush();
}

protected async onShutdown(): Promise<void> {
if (this._interval) {
clearInterval(this._interval);
}

await this._exporter.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ export class MetricCollector implements MetricProducer {
* Delegates for MetricReader.forceFlush.
*/
async forceFlush(): Promise<void> {
return this._metricReader.forceFlush();
await this._metricReader.forceFlush();
}

/**
* Delegates for MetricReader.shutdown.
*/
async shutdown(): Promise<void> {
return this._metricReader.shutdown();
await this._metricReader.shutdown();
}
}

Expand Down
44 changes: 44 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,47 @@ export function hashAttributes(attributes: Attributes): string {
return (result += key + ':' + attributes[key]);
}, '|#');
}

/**
* Error that is thrown on timeouts.
*/
export class TimeoutError extends Error {
constructor(message?: string) {
super(message);

// manually adjust prototype to retain `instanceof` functionality when targeting ES5, see:
// https://github.com/Microsoft/TypeScript-wiki/blob/main/Breaking-Changes.md#extending-built-ins-like-error-array-and-map-may-no-longer-work
Object.setPrototypeOf(this, TimeoutError.prototype);
}
}

/**
* Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise
* rejects, and resolves if the specified promise resolves.
*
* <p> NOTE: this operation will continue even after it throws a {@link TimeoutError}.
*
* @param promise promise to use with timeout.
* @param timeout the timeout in milliseconds until the returned promise is rejected.
*/
export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
let timeoutHandle: ReturnType<typeof setTimeout>;

const timeoutPromise = new Promise<never>(function timeoutFunction(_resolve, reject) {
timeoutHandle = setTimeout(
function timeoutHandler() {
reject(new TimeoutError('Operation timed out.'));
},
timeout
);
});

return Promise.race([promise, timeoutPromise]).then(result => {
clearTimeout(timeoutHandle);
return result;
},
reason => {
clearTimeout(timeoutHandle);
throw reason;
});
}
Loading

0 comments on commit 06c5669

Please sign in to comment.