diff --git a/packages/opentelemetry-core/src/utils/environment.ts b/packages/opentelemetry-core/src/utils/environment.ts index 2088772ca2..5b3a49b174 100644 --- a/packages/opentelemetry-core/src/utils/environment.ts +++ b/packages/opentelemetry-core/src/utils/environment.ts @@ -23,8 +23,10 @@ const DEFAULT_LIST_SEPARATOR = ','; */ const ENVIRONMENT_NUMBERS_KEYS = [ - 'OTEL_BSP_MAX_BATCH_SIZE', - 'OTEL_BSP_SCHEDULE_DELAY_MILLIS', + 'OTEL_BSP_EXPORT_TIMEOUT', + 'OTEL_BSP_MAX_EXPORT_BATCH_SIZE', + 'OTEL_BSP_MAX_QUEUE_SIZE', + 'OTEL_BSP_SCHEDULE_DELAY', 'OTEL_SAMPLING_PROBABILITY', 'OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT', 'OTEL_SPAN_EVENT_COUNT_LIMIT', @@ -92,8 +94,10 @@ export const DEFAULT_ENVIRONMENT: Required = { OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: 1000, OTEL_SPAN_EVENT_COUNT_LIMIT: 1000, OTEL_SPAN_LINK_COUNT_LIMIT: 1000, - OTEL_BSP_MAX_BATCH_SIZE: 512, - OTEL_BSP_SCHEDULE_DELAY_MILLIS: 5000, + OTEL_BSP_EXPORT_TIMEOUT: 30000, + OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 512, + OTEL_BSP_MAX_QUEUE_SIZE: 2048, + OTEL_BSP_SCHEDULE_DELAY: 5000, }; /** diff --git a/packages/opentelemetry-core/test/utils/environment.test.ts b/packages/opentelemetry-core/test/utils/environment.test.ts index 35c9bdcb5a..100a0486b6 100644 --- a/packages/opentelemetry-core/test/utils/environment.test.ts +++ b/packages/opentelemetry-core/test/utils/environment.test.ts @@ -80,8 +80,8 @@ describe('environment', () => { HOSTNAME: 'hostname', KUBERNETES_SERVICE_HOST: 'https://k8s.host/', NAMESPACE: 'namespace', - OTEL_BSP_MAX_BATCH_SIZE: 40, - OTEL_BSP_SCHEDULE_DELAY_MILLIS: 50, + OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 40, + OTEL_BSP_SCHEDULE_DELAY: 50, OTEL_EXPORTER_JAEGER_AGENT_HOST: 'host.domain.com', OTEL_EXPORTER_JAEGER_ENDPOINT: 'https://example.com/endpoint', OTEL_EXPORTER_JAEGER_PASSWORD: 'secret', @@ -121,8 +121,8 @@ describe('environment', () => { assert.strictEqual(env.CONTAINER_NAME, 'container-1'); assert.strictEqual(env.KUBERNETES_SERVICE_HOST, 'https://k8s.host/'); assert.strictEqual(env.OTEL_RESOURCE_ATTRIBUTES, ''); - assert.strictEqual(env.OTEL_BSP_MAX_BATCH_SIZE, 40); - assert.strictEqual(env.OTEL_BSP_SCHEDULE_DELAY_MILLIS, 50); + assert.strictEqual(env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE, 40); + assert.strictEqual(env.OTEL_BSP_SCHEDULE_DELAY, 50); }); it('should match invalid values to closest valid equivalent', () => { diff --git a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts index 3b42d1f072..8aaa15b13b 100644 --- a/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts +++ b/packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts @@ -32,8 +32,10 @@ import { SpanExporter } from './SpanExporter'; * the SDK then pushes them to the exporter pipeline. */ export class BatchSpanProcessor implements SpanProcessor { - private readonly _bufferSize: number; - private readonly _bufferTimeout: number; + private readonly _maxExportBatchSize: number; + private readonly _maxQueueSize: number; + private readonly _scheduledDelayMillis: number; + private readonly _exportTimeoutMillis: number; private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; @@ -42,21 +44,29 @@ export class BatchSpanProcessor implements SpanProcessor { constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) { const env = getEnv(); - this._bufferSize = - config && config.bufferSize - ? config.bufferSize - : env.OTEL_BSP_MAX_BATCH_SIZE; - this._bufferTimeout = - config && typeof config.bufferTimeout === 'number' - ? config.bufferTimeout - : env.OTEL_BSP_SCHEDULE_DELAY_MILLIS; + this._maxExportBatchSize = + typeof config?.maxExportBatchSize === 'number' + ? config.maxExportBatchSize + : env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE; + this._maxQueueSize = + typeof config?.maxQueueSize === 'number' + ? config?.maxQueueSize + : env.OTEL_BSP_MAX_QUEUE_SIZE; + this._scheduledDelayMillis = + typeof config?.scheduledDelayMillis === 'number' + ? config?.scheduledDelayMillis + : env.OTEL_BSP_SCHEDULE_DELAY; + this._exportTimeoutMillis = + typeof config?.exportTimeoutMillis === 'number' + ? config?.exportTimeoutMillis + : env.OTEL_BSP_EXPORT_TIMEOUT; } forceFlush(): Promise { if (this._isShutdown) { return this._shuttingDownPromise; } - return this._flush(); + return this._flushAll(); } // does nothing. @@ -77,7 +87,7 @@ export class BatchSpanProcessor implements SpanProcessor { this._shuttingDownPromise = new Promise((resolve, reject) => { Promise.resolve() .then(() => { - return this._flush(); + return this._flushAll(); }) .then(() => { return this._exporter.shutdown(); @@ -92,49 +102,84 @@ export class BatchSpanProcessor implements SpanProcessor { /** Add a span in the buffer. */ private _addToBuffer(span: ReadableSpan) { + if (this._finishedSpans.length >= this._maxQueueSize) { + // limit reached, drop span + return; + } this._finishedSpans.push(span); this._maybeStartTimer(); - if (this._finishedSpans.length > this._bufferSize) { - this._flush().catch(e => { - globalErrorHandler(e); - }); - } } - /** Send the span data list to exporter */ - private _flush(): Promise { + /** + * Send all spans to the exporter respecting the batch size limit + * This function is used only on forceFlush or shutdown, + * for all other cases _flush should be used + * */ + private _flushAll(): Promise { + return new Promise((resolve, reject) => { + const promises = []; + // calculate number of batches + const count = Math.ceil( + this._finishedSpans.length / this._maxExportBatchSize + ); + for (let i = 0, j = count; i < j; i++) { + promises.push(this._flushOneBatch()); + } + Promise.all(promises) + .then(() => { + resolve(); + }) + .catch(reject); + }); + } + + private _flushOneBatch(): Promise { this._clearTimer(); if (this._finishedSpans.length === 0) { return Promise.resolve(); } return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + // don't wait anymore for export, this way the next batch can start + reject(new Error('Timeout')); + }, this._exportTimeoutMillis); // prevent downstream exporter calls from generating spans context.with(suppressInstrumentation(context.active()), () => { // Reset the finished spans buffer here because the next invocations of the _flush method // could pass the same finished spans to the exporter if the buffer is cleared // outside of the execution of this callback. - this._exporter.export(this._finishedSpans.splice(0), result => { - if (result.code === ExportResultCode.SUCCESS) { - resolve(); - } else { - reject( - result.error ?? - new Error('BatchSpanProcessor: span export failed') - ); + this._exporter.export( + this._finishedSpans.splice(0, this._maxExportBatchSize), + result => { + clearTimeout(timer); + if (result.code === ExportResultCode.SUCCESS) { + resolve(); + } else { + reject( + result.error ?? + new Error('BatchSpanProcessor: span export failed') + ); + } } - }); + ); }); }); } private _maybeStartTimer() { if (this._timer !== undefined) return; - this._timer = setTimeout(() => { - this._flush().catch(e => { - globalErrorHandler(e); - }); - }, this._bufferTimeout); + this._flushOneBatch() + .catch(e => { + globalErrorHandler(e); + }) + .then(() => { + if (this._finishedSpans.length > 0) { + this._clearTimer(); + this._maybeStartTimer(); + } + }); + }, this._scheduledDelayMillis); unrefTimer(this._timer); } diff --git a/packages/opentelemetry-tracing/src/types.ts b/packages/opentelemetry-tracing/src/types.ts index eb0caba670..0102c902ab 100644 --- a/packages/opentelemetry-tracing/src/types.ts +++ b/packages/opentelemetry-tracing/src/types.ts @@ -75,8 +75,19 @@ export interface TraceParams { /** Interface configuration for a buffer. */ export interface BufferConfig { - /** Maximum size of a buffer. */ - bufferSize?: number; - /** Max time for a buffer can wait before being sent */ - bufferTimeout?: number; + /** The maximum batch size of every export. It must be smaller or equal to + * maxQueueSize. The default value is 512. */ + maxExportBatchSize?: number; + + /** The delay interval in milliseconds between two consecutive exports. + * The default value is 5000ms. */ + scheduledDelayMillis?: number; + + /** How long the export can run before it is cancelled. + * The default value is 30000ms */ + exportTimeoutMillis?: number; + + /** The maximum queue size. After the size is reached spans are dropped. + * The default value is 2048. */ + maxQueueSize?: number; } diff --git a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts index 106a45ed21..27752d88ed 100644 --- a/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts +++ b/packages/opentelemetry-tracing/test/export/BatchSpanProcessor.test.ts @@ -44,8 +44,8 @@ function createSampledSpan(spanName: string): Span { describe('BatchSpanProcessor', () => { const name = 'span-name'; const defaultBufferConfig = { - bufferSize: 5, - bufferTimeout: 2000, + maxExportBatchSize: 5, + scheduledDelayMillis: 2500, }; let exporter: InMemorySpanExporter; beforeEach(() => { @@ -77,8 +77,8 @@ describe('BatchSpanProcessor', () => { it('should read defaults from environment', () => { const bspConfig = { - OTEL_BSP_MAX_BATCH_SIZE: 256, - OTEL_BSP_SCHEDULE_DELAY_MILLIS: 2500, + OTEL_BSP_MAX_EXPORT_BATCH_SIZE: 256, + OTEL_BSP_SCHEDULE_DELAY: 2500, }; let env: Record; @@ -94,8 +94,8 @@ describe('BatchSpanProcessor', () => { const processor = new BatchSpanProcessor(exporter); assert.ok(processor instanceof BatchSpanProcessor); - assert.strictEqual(processor['_bufferSize'], 256); - assert.strictEqual(processor['_bufferTimeout'], 2500); + assert.strictEqual(processor['_maxExportBatchSize'], 256); + assert.strictEqual(processor['_scheduledDelayMillis'], 2500); processor.shutdown(); Object.keys(bspConfig).forEach(k => delete env[k]); @@ -129,9 +129,10 @@ describe('BatchSpanProcessor', () => { assert.strictEqual(exporter.getFinishedSpans().length, 0); }); - it('should export the sampled spans with buffer size reached', async () => { + it('should export the sampled spans with buffer size reached', done => { + const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.bufferSize; i++) { + for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { const span = createSampledSpan(`${name}_${i}`); processor.onStart(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); @@ -139,19 +140,23 @@ describe('BatchSpanProcessor', () => { processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); } - // Now we should start seeing the spans in exporter const span = createSampledSpan(`${name}_6`); processor.onEnd(span); - assert.strictEqual(exporter.getFinishedSpans().length, 6); - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); + setTimeout(async () => { + assert.strictEqual(exporter.getFinishedSpans().length, 5); + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); + done(); + }, defaultBufferConfig.scheduledDelayMillis + 1000); + clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); + clock.restore(); }); it('should force flush when timeout exceeded', done => { const clock = sinon.useFakeTimers(); const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.bufferSize; i++) { + for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { const span = createSampledSpan(`${name}_${i}`); processor.onEnd(span); assert.strictEqual(exporter.getFinishedSpans().length, 0); @@ -160,16 +165,16 @@ describe('BatchSpanProcessor', () => { setTimeout(() => { assert.strictEqual(exporter.getFinishedSpans().length, 5); done(); - }, defaultBufferConfig.bufferTimeout + 1000); + }, defaultBufferConfig.scheduledDelayMillis + 1000); - clock.tick(defaultBufferConfig.bufferTimeout + 1000); + clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); clock.restore(); }); it('should force flush on demand', () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - for (let i = 0; i < defaultBufferConfig.bufferSize; i++) { + for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { const span = createSampledSpan(`${name}_${i}`); processor.onEnd(span); } @@ -188,7 +193,7 @@ describe('BatchSpanProcessor', () => { const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); // start but do not end spans - for (let i = 0; i < defaultBufferConfig.bufferSize; i++) { + for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { const span = tracer.startSpan('spanName'); processor.onStart(span as Span); } @@ -199,31 +204,52 @@ describe('BatchSpanProcessor', () => { // because no spans are ended sinon.assert.notCalled(spy); done(); - }, defaultBufferConfig.bufferTimeout + 1000); + }, defaultBufferConfig.scheduledDelayMillis + 1000); // no spans have been finished assert.strictEqual(exporter.getFinishedSpans().length, 0); - clock.tick(defaultBufferConfig.bufferTimeout + 1000); + clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); clock.restore(); }); - it('should export each sampled span exactly once with buffer size reached multiple times', async () => { - const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - const totalSpans = defaultBufferConfig.bufferSize * 2; - for (let i = 0; i <= totalSpans; i++) { - const span = createSampledSpan(`${name}_${i}`); - + it( + 'should export each sampled span exactly once with buffer size' + + ' reached multiple times', + done => { + const originalTimeout = setTimeout; + const clock = sinon.useFakeTimers(); + const processor = new BatchSpanProcessor(exporter, defaultBufferConfig); + const totalSpans = defaultBufferConfig.maxExportBatchSize * 2; + for (let i = 0; i < totalSpans; i++) { + const span = createSampledSpan(`${name}_${i}`); + processor.onEnd(span); + } + const span = createSampledSpan(`${name}_last`); processor.onEnd(span); + clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); + + // because there is an async promise that will be trigger original + // timeout is needed to simulate a real tick to the next + originalTimeout(() => { + clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); + originalTimeout(async () => { + clock.tick(defaultBufferConfig.scheduledDelayMillis + 10); + clock.restore(); + + console.log(exporter.getFinishedSpans().length); + assert.strictEqual( + exporter.getFinishedSpans().length, + totalSpans + 1 + ); + + await processor.shutdown(); + assert.strictEqual(exporter.getFinishedSpans().length, 0); + done(); + }); + }); } - // Now we should start seeing the spans in exporter - const span = createSampledSpan(`${name}_last`); - processor.onEnd(span); - assert.strictEqual(exporter.getFinishedSpans().length, totalSpans + 2); - - await processor.shutdown(); - assert.strictEqual(exporter.getFinishedSpans().length, 0); - }); + ); }); describe('force flush', () => { @@ -248,14 +274,12 @@ describe('BatchSpanProcessor', () => { beforeEach(() => { processor = new BatchSpanProcessor(exporter, defaultBufferConfig); - const span = createSampledSpan('test'); - processor.onStart(span); - processor.onEnd(span); - - assert.strictEqual(processor['_finishedSpans'].length, 1); }); it('should call an async callback when flushing is complete', done => { + const span = createSampledSpan('test'); + processor.onStart(span); + processor.onEnd(span); processor.forceFlush().then(() => { assert.strictEqual(exporter.getFinishedSpans().length, 1); done(); @@ -270,6 +294,9 @@ describe('BatchSpanProcessor', () => { callback({ code: ExportResultCode.SUCCESS }); }, 0); }); + const span = createSampledSpan('test'); + processor.onStart(span); + processor.onEnd(span); processor.shutdown().then(() => { assert.strictEqual(exportedSpans, 1); @@ -277,7 +304,8 @@ describe('BatchSpanProcessor', () => { }); }); - it('should call globalErrorHandler when exporting fails', async () => { + it('should call globalErrorHandler when exporting fails', done => { + const clock = sinon.useFakeTimers(); const expectedError = new Error('Exporter failed'); sinon.stub(exporter, 'export').callsFake((_, callback) => { setTimeout(() => { @@ -289,27 +317,25 @@ describe('BatchSpanProcessor', () => { setGlobalErrorHandler(errorHandlerSpy); - // Cause a flush by emitting more spans then the default buffer size - for (let i = 0; i < defaultBufferConfig.bufferSize; i++) { + for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) { const span = createSampledSpan('test'); processor.onStart(span); processor.onEnd(span); } - await new Promise(resolve => { - setTimeout(() => { - resolve(); - }, 0); - }); + clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000); + clock.restore(); + setTimeout(async () => { + assert.strictEqual(errorHandlerSpy.callCount, 1); - assert.strictEqual(errorHandlerSpy.callCount, 1); + const [[error]] = errorHandlerSpy.args; - const [[error]] = errorHandlerSpy.args; + assert.deepStrictEqual(error, expectedError); - assert.deepStrictEqual(error, expectedError); - - //reset global error handler - setGlobalErrorHandler(loggingErrorHandler()); + //reset global error handler + setGlobalErrorHandler(loggingErrorHandler()); + done(); + }); }); }); @@ -340,4 +366,26 @@ describe('BatchSpanProcessor', () => { }); }); }); + describe('maxQueueSize', () => { + let processor: BatchSpanProcessor; + + describe('when there are more spans then "maxQueueSize"', () => { + beforeEach(() => { + processor = new BatchSpanProcessor( + exporter, + Object.assign({}, defaultBufferConfig, { + maxQueueSize: 6, + }) + ); + }); + it('should drop spans', () => { + const span = createSampledSpan('test'); + for (let i = 0, j = 20; i < j; i++) { + processor.onStart(span); + processor.onEnd(span); + } + assert.equal(processor['_finishedSpans'].length, 6); + }); + }); + }); });