Skip to content

Commit

Permalink
chore: batch processor, aligning with latest spec changes for environ…
Browse files Browse the repository at this point in the history
…ments variables (#1918)

Co-authored-by: Valentin Marchaud <contact@vmarchaud.fr>
  • Loading branch information
obecny and vmarchaud authored Feb 14, 2021
1 parent 06c7ec7 commit 9f965b0
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 97 deletions.
12 changes: 8 additions & 4 deletions packages/opentelemetry-core/src/utils/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ const DEFAULT_LIST_SEPARATOR = ',';
*/

const ENVIRONMENT_NUMBERS_KEYS = [
'OTEL_BSP_MAX_BATCH_SIZE',
'OTEL_BSP_SCHEDULE_DELAY_MILLIS',
'OTEL_BSP_EXPORT_TIMEOUT',
'OTEL_BSP_MAX_EXPORT_BATCH_SIZE',
'OTEL_BSP_MAX_QUEUE_SIZE',
'OTEL_BSP_SCHEDULE_DELAY',
'OTEL_SAMPLING_PROBABILITY',
'OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT',
'OTEL_SPAN_EVENT_COUNT_LIMIT',
Expand Down Expand Up @@ -92,8 +94,10 @@ export const DEFAULT_ENVIRONMENT: Required<ENVIRONMENT> = {
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: 1000,
OTEL_SPAN_EVENT_COUNT_LIMIT: 1000,
OTEL_SPAN_LINK_COUNT_LIMIT: 1000,
OTEL_BSP_MAX_BATCH_SIZE: 512,
OTEL_BSP_SCHEDULE_DELAY_MILLIS: 5000,
OTEL_BSP_EXPORT_TIMEOUT: 30000,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 512,
OTEL_BSP_MAX_QUEUE_SIZE: 2048,
OTEL_BSP_SCHEDULE_DELAY: 5000,
};

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/opentelemetry-core/test/utils/environment.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ describe('environment', () => {
HOSTNAME: 'hostname',
KUBERNETES_SERVICE_HOST: 'https://k8s.host/',
NAMESPACE: 'namespace',
OTEL_BSP_MAX_BATCH_SIZE: 40,
OTEL_BSP_SCHEDULE_DELAY_MILLIS: 50,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 40,
OTEL_BSP_SCHEDULE_DELAY: 50,
OTEL_EXPORTER_JAEGER_AGENT_HOST: 'host.domain.com',
OTEL_EXPORTER_JAEGER_ENDPOINT: 'https://example.com/endpoint',
OTEL_EXPORTER_JAEGER_PASSWORD: 'secret',
Expand Down Expand Up @@ -121,8 +121,8 @@ describe('environment', () => {
assert.strictEqual(env.CONTAINER_NAME, 'container-1');
assert.strictEqual(env.KUBERNETES_SERVICE_HOST, 'https://k8s.host/');
assert.strictEqual(env.OTEL_RESOURCE_ATTRIBUTES, '<attrs>');
assert.strictEqual(env.OTEL_BSP_MAX_BATCH_SIZE, 40);
assert.strictEqual(env.OTEL_BSP_SCHEDULE_DELAY_MILLIS, 50);
assert.strictEqual(env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE, 40);
assert.strictEqual(env.OTEL_BSP_SCHEDULE_DELAY, 50);
});

it('should match invalid values to closest valid equivalent', () => {
Expand Down
111 changes: 78 additions & 33 deletions packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import { SpanExporter } from './SpanExporter';
* the SDK then pushes them to the exporter pipeline.
*/
export class BatchSpanProcessor implements SpanProcessor {
private readonly _bufferSize: number;
private readonly _bufferTimeout: number;
private readonly _maxExportBatchSize: number;
private readonly _maxQueueSize: number;
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;

private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | undefined;
Expand All @@ -42,21 +44,29 @@ export class BatchSpanProcessor implements SpanProcessor {

constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) {
const env = getEnv();
this._bufferSize =
config && config.bufferSize
? config.bufferSize
: env.OTEL_BSP_MAX_BATCH_SIZE;
this._bufferTimeout =
config && typeof config.bufferTimeout === 'number'
? config.bufferTimeout
: env.OTEL_BSP_SCHEDULE_DELAY_MILLIS;
this._maxExportBatchSize =
typeof config?.maxExportBatchSize === 'number'
? config.maxExportBatchSize
: env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE;
this._maxQueueSize =
typeof config?.maxQueueSize === 'number'
? config?.maxQueueSize
: env.OTEL_BSP_MAX_QUEUE_SIZE;
this._scheduledDelayMillis =
typeof config?.scheduledDelayMillis === 'number'
? config?.scheduledDelayMillis
: env.OTEL_BSP_SCHEDULE_DELAY;
this._exportTimeoutMillis =
typeof config?.exportTimeoutMillis === 'number'
? config?.exportTimeoutMillis
: env.OTEL_BSP_EXPORT_TIMEOUT;
}

forceFlush(): Promise<void> {
if (this._isShutdown) {
return this._shuttingDownPromise;
}
return this._flush();
return this._flushAll();
}

// does nothing.
Expand All @@ -77,7 +87,7 @@ export class BatchSpanProcessor implements SpanProcessor {
this._shuttingDownPromise = new Promise((resolve, reject) => {
Promise.resolve()
.then(() => {
return this._flush();
return this._flushAll();
})
.then(() => {
return this._exporter.shutdown();
Expand All @@ -92,49 +102,84 @@ export class BatchSpanProcessor implements SpanProcessor {

/** Add a span in the buffer. */
private _addToBuffer(span: ReadableSpan) {
if (this._finishedSpans.length >= this._maxQueueSize) {
// limit reached, drop span
return;
}
this._finishedSpans.push(span);
this._maybeStartTimer();
if (this._finishedSpans.length > this._bufferSize) {
this._flush().catch(e => {
globalErrorHandler(e);
});
}
}

/** Send the span data list to exporter */
private _flush(): Promise<void> {
/**
* Send all spans to the exporter respecting the batch size limit
* This function is used only on forceFlush or shutdown,
* for all other cases _flush should be used
* */
private _flushAll(): Promise<void> {
return new Promise((resolve, reject) => {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._flushOneBatch());
}
Promise.all(promises)
.then(() => {
resolve();
})
.catch(reject);
});
}

private _flushOneBatch(): Promise<void> {
this._clearTimer();
if (this._finishedSpans.length === 0) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
// don't wait anymore for export, this way the next batch can start
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
// prevent downstream exporter calls from generating spans
context.with(suppressInstrumentation(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
// could pass the same finished spans to the exporter if the buffer is cleared
// outside of the execution of this callback.
this._exporter.export(this._finishedSpans.splice(0), result => {
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
reject(
result.error ??
new Error('BatchSpanProcessor: span export failed')
);
this._exporter.export(
this._finishedSpans.splice(0, this._maxExportBatchSize),
result => {
clearTimeout(timer);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
reject(
result.error ??
new Error('BatchSpanProcessor: span export failed')
);
}
}
});
);
});
});
}

private _maybeStartTimer() {
if (this._timer !== undefined) return;

this._timer = setTimeout(() => {
this._flush().catch(e => {
globalErrorHandler(e);
});
}, this._bufferTimeout);
this._flushOneBatch()
.catch(e => {
globalErrorHandler(e);
})
.then(() => {
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
});
}, this._scheduledDelayMillis);
unrefTimer(this._timer);
}

Expand Down
19 changes: 15 additions & 4 deletions packages/opentelemetry-tracing/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,19 @@ export interface TraceParams {

/** Interface configuration for a buffer. */
export interface BufferConfig {
/** Maximum size of a buffer. */
bufferSize?: number;
/** Max time for a buffer can wait before being sent */
bufferTimeout?: number;
/** The maximum batch size of every export. It must be smaller or equal to
* maxQueueSize. The default value is 512. */
maxExportBatchSize?: number;

/** The delay interval in milliseconds between two consecutive exports.
* The default value is 5000ms. */
scheduledDelayMillis?: number;

/** How long the export can run before it is cancelled.
* The default value is 30000ms */
exportTimeoutMillis?: number;

/** The maximum queue size. After the size is reached spans are dropped.
* The default value is 2048. */
maxQueueSize?: number;
}
Loading

0 comments on commit 9f965b0

Please sign in to comment.