Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: batch processor, aligning with latest spec changes for environments variables #1918

Merged
merged 5 commits into from
Feb 14, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions packages/opentelemetry-core/src/utils/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ const DEFAULT_LIST_SEPARATOR = ',';
*/

const ENVIRONMENT_NUMBERS_KEYS = [
'OTEL_BSP_MAX_BATCH_SIZE',
'OTEL_BSP_SCHEDULE_DELAY_MILLIS',
'OTEL_BSP_EXPORT_TIMEOUT',
'OTEL_BSP_MAX_EXPORT_BATCH_SIZE',
'OTEL_BSP_MAX_QUEUE_SIZE',
'OTEL_BSP_SCHEDULE_DELAY',
'OTEL_SAMPLING_PROBABILITY',
'OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT',
'OTEL_SPAN_EVENT_COUNT_LIMIT',
Expand Down Expand Up @@ -92,8 +94,10 @@ export const DEFAULT_ENVIRONMENT: Required<ENVIRONMENT> = {
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: 1000,
OTEL_SPAN_EVENT_COUNT_LIMIT: 1000,
OTEL_SPAN_LINK_COUNT_LIMIT: 1000,
OTEL_BSP_MAX_BATCH_SIZE: 512,
OTEL_BSP_SCHEDULE_DELAY_MILLIS: 5000,
OTEL_BSP_EXPORT_TIMEOUT: 30000,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 512,
OTEL_BSP_MAX_QUEUE_SIZE: 2048,
OTEL_BSP_SCHEDULE_DELAY: 5000,
};

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/opentelemetry-core/test/utils/environment.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ describe('environment', () => {
HOSTNAME: 'hostname',
KUBERNETES_SERVICE_HOST: 'https://k8s.host/',
NAMESPACE: 'namespace',
OTEL_BSP_MAX_BATCH_SIZE: 40,
OTEL_BSP_SCHEDULE_DELAY_MILLIS: 50,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 40,
OTEL_BSP_SCHEDULE_DELAY: 50,
OTEL_EXPORTER_JAEGER_AGENT_HOST: 'host.domain.com',
OTEL_EXPORTER_JAEGER_ENDPOINT: 'https://example.com/endpoint',
OTEL_EXPORTER_JAEGER_PASSWORD: 'secret',
Expand Down Expand Up @@ -121,8 +121,8 @@ describe('environment', () => {
assert.strictEqual(env.CONTAINER_NAME, 'container-1');
assert.strictEqual(env.KUBERNETES_SERVICE_HOST, 'https://k8s.host/');
assert.strictEqual(env.OTEL_RESOURCE_ATTRIBUTES, '<attrs>');
assert.strictEqual(env.OTEL_BSP_MAX_BATCH_SIZE, 40);
assert.strictEqual(env.OTEL_BSP_SCHEDULE_DELAY_MILLIS, 50);
assert.strictEqual(env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE, 40);
assert.strictEqual(env.OTEL_BSP_SCHEDULE_DELAY, 50);
});

it('should match invalid values to closest valid equivalent', () => {
Expand Down
111 changes: 78 additions & 33 deletions packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import { SpanExporter } from './SpanExporter';
* the SDK then pushes them to the exporter pipeline.
*/
export class BatchSpanProcessor implements SpanProcessor {
private readonly _bufferSize: number;
private readonly _bufferTimeout: number;
private readonly _maxExportBatchSize: number;
private readonly _maxQueueSize: number;
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;

private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | undefined;
Expand All @@ -42,21 +44,29 @@ export class BatchSpanProcessor implements SpanProcessor {

constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) {
const env = getEnv();
this._bufferSize =
config && config.bufferSize
? config.bufferSize
: env.OTEL_BSP_MAX_BATCH_SIZE;
this._bufferTimeout =
config && typeof config.bufferTimeout === 'number'
? config.bufferTimeout
: env.OTEL_BSP_SCHEDULE_DELAY_MILLIS;
this._maxExportBatchSize =
typeof config?.maxExportBatchSize === 'number'
? config.maxExportBatchSize
: env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE;
this._maxQueueSize =
typeof config?.maxQueueSize === 'number'
? config?.maxQueueSize
: env.OTEL_BSP_MAX_QUEUE_SIZE;
this._scheduledDelayMillis =
typeof config?.scheduledDelayMillis === 'number'
? config?.scheduledDelayMillis
: env.OTEL_BSP_SCHEDULE_DELAY;
this._exportTimeoutMillis =
typeof config?.exportTimeoutMillis === 'number'
? config?.exportTimeoutMillis
: env.OTEL_BSP_EXPORT_TIMEOUT;
}

forceFlush(): Promise<void> {
if (this._isShutdown) {
return this._shuttingDownPromise;
}
return this._flush();
return this._flushAll();
}

// does nothing.
Expand All @@ -77,7 +87,7 @@ export class BatchSpanProcessor implements SpanProcessor {
this._shuttingDownPromise = new Promise((resolve, reject) => {
Promise.resolve()
.then(() => {
return this._flush();
return this._flushAll();
})
.then(() => {
return this._exporter.shutdown();
Expand All @@ -92,49 +102,84 @@ export class BatchSpanProcessor implements SpanProcessor {

/** Add a span in the buffer. */
private _addToBuffer(span: ReadableSpan) {
if (this._finishedSpans.length >= this._maxQueueSize) {
// limit reached, drop span
return;
}
this._finishedSpans.push(span);
this._maybeStartTimer();
if (this._finishedSpans.length > this._bufferSize) {
this._flush().catch(e => {
globalErrorHandler(e);
});
}
}

/** Send the span data list to exporter */
private _flush(): Promise<void> {
/**
* Send all spans to the exporter respecting the batch size limit
obecny marked this conversation as resolved.
Show resolved Hide resolved
* This function is used only on forceFlush or shutdown,
* for all other cases _flush should be used
* */
private _flushAll(): Promise<void> {
return new Promise((resolve, reject) => {
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._flushOneBatch());
}
Promise.all(promises)
.then(() => {
resolve();
})
.catch(reject);
});
}

private _flushOneBatch(): Promise<void> {
this._clearTimer();
if (this._finishedSpans.length === 0) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
// don't wait anymore for export, this way the next batch can start
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
// prevent downstream exporter calls from generating spans
context.with(suppressInstrumentation(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
// could pass the same finished spans to the exporter if the buffer is cleared
// outside of the execution of this callback.
this._exporter.export(this._finishedSpans.splice(0), result => {
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
reject(
result.error ??
new Error('BatchSpanProcessor: span export failed')
);
this._exporter.export(
this._finishedSpans.splice(0, this._maxExportBatchSize),
result => {
clearTimeout(timer);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
reject(
result.error ??
new Error('BatchSpanProcessor: span export failed')
);
}
}
});
);
});
});
}

private _maybeStartTimer() {
if (this._timer !== undefined) return;

this._timer = setTimeout(() => {
this._flush().catch(e => {
globalErrorHandler(e);
});
}, this._bufferTimeout);
this._flushOneBatch()
.catch(e => {
globalErrorHandler(e);
})
.then(() => {
if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
});
}, this._scheduledDelayMillis);
unrefTimer(this._timer);
}

Expand Down
19 changes: 15 additions & 4 deletions packages/opentelemetry-tracing/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,19 @@ export interface TraceParams {

/** Interface configuration for a buffer. */
export interface BufferConfig {
/** Maximum size of a buffer. */
bufferSize?: number;
/** Max time for a buffer can wait before being sent */
bufferTimeout?: number;
/** The maximum batch size of every export. It must be smaller or equal to
* maxQueueSize. The default value is 512. */
maxExportBatchSize?: number;

/** The delay interval in milliseconds between two consecutive exports.
* The default value is 5000ms. */
scheduledDelayMillis?: number;

/** How long the export can run before it is cancelled.
* The default value is 30000ms */
exportTimeoutMillis?: number;

/** The maximum queue size. After the size is reached spans are dropped.
* The default value is 2048. */
maxQueueSize?: number;
}
Loading