Skip to content

Commit

Permalink
fix(tracing): use globalErrorHandler when flushing fails (#1622)
Browse files Browse the repository at this point in the history
Co-authored-by: Bartlomiej Obecny <bobecny@gmail.com>
  • Loading branch information
johanneswuerbach and obecny authored Oct 30, 2020
1 parent 435d608 commit b523dab
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 8 deletions.
20 changes: 16 additions & 4 deletions packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/

import { context, suppressInstrumentation } from '@opentelemetry/api';
import { ExportResult, unrefTimer } from '@opentelemetry/core';
import {
ExportResult,
globalErrorHandler,
unrefTimer,
} from '@opentelemetry/core';
import { Span } from '../Span';
import { SpanProcessor } from '../SpanProcessor';
import { BufferConfig } from '../types';
Expand Down Expand Up @@ -90,7 +94,9 @@ export class BatchSpanProcessor implements SpanProcessor {
this._finishedSpans.push(span);
this._maybeStartTimer();
if (this._finishedSpans.length > this._bufferSize) {
this._flush();
this._flush().catch(e => {
globalErrorHandler(e);
});
}
}

Expand All @@ -108,7 +114,11 @@ export class BatchSpanProcessor implements SpanProcessor {
if (result === ExportResult.SUCCESS) {
resolve();
} else {
reject(result);
reject(
new Error(
`BatchSpanProcessor: span export failed (status ${result})`
)
);
}
});
});
Expand All @@ -119,7 +129,9 @@ export class BatchSpanProcessor implements SpanProcessor {
if (this._timer !== undefined) return;

this._timer = setTimeout(() => {
this._flush().catch();
this._flush().catch(e => {
globalErrorHandler(e);
});
}, this._bufferTimeout);
unrefTimer(this._timer);
}
Expand Down
13 changes: 11 additions & 2 deletions packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/

import { context, suppressInstrumentation } from '@opentelemetry/api';
import { ExportResult, globalErrorHandler } from '@opentelemetry/core';
import { Span } from '../Span';
import { SpanExporter } from './SpanExporter';
import { SpanProcessor } from '../SpanProcessor';
import { ReadableSpan } from './ReadableSpan';
import { SpanExporter } from './SpanExporter';

/**
* An implementation of the {@link SpanProcessor} that converts the {@link Span}
Expand Down Expand Up @@ -47,7 +48,15 @@ export class SimpleSpanProcessor implements SpanProcessor {

// prevent downstream exporter calls from generating spans
context.with(suppressInstrumentation(context.active()), () => {
this._exporter.export([span], () => {});
this._exporter.export([span], result => {
if (result !== ExportResult.SUCCESS) {
globalErrorHandler(
new Error(
`SimpleSpanProcessor: span export failed (status ${result})`
)
);
}
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
* limitations under the License.
*/

import { AlwaysOnSampler, ExportResult } from '@opentelemetry/core';
import {
AlwaysOnSampler,
ExportResult,
loggingErrorHandler,
setGlobalErrorHandler,
} from '@opentelemetry/core';
import * as assert from 'assert';
import * as sinon from 'sinon';
import {
Expand Down Expand Up @@ -199,7 +204,7 @@ describe('BatchSpanProcessor', () => {
let processor: BatchSpanProcessor;

beforeEach(() => {
processor = new BatchSpanProcessor(exporter);
processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
const span = createSampledSpan('test');
processor.onStart(span);
processor.onEnd(span);
Expand Down Expand Up @@ -228,6 +233,43 @@ describe('BatchSpanProcessor', () => {
done();
});
});

it('should call globalErrorHandler when exporting fails', async () => {
const expectedError = new Error(
'BatchSpanProcessor: span export failed (status 1)'
);
sinon.stub(exporter, 'export').callsFake((_, callback) => {
setTimeout(() => {
callback(ExportResult.FAILED_NOT_RETRYABLE);
}, 0);
});

const errorHandlerSpy = sinon.spy();

setGlobalErrorHandler(errorHandlerSpy);

// Cause a flush by emitting more spans then the default buffer size
for (let i = 0; i < defaultBufferConfig.bufferSize; i++) {
const span = createSampledSpan('test');
processor.onStart(span);
processor.onEnd(span);
}

await new Promise(resolve => {
setTimeout(() => {
resolve();
}, 0);
});

assert.strictEqual(errorHandlerSpy.callCount, 1);

const [[error]] = errorHandlerSpy.args;

assert.deepStrictEqual(error, expectedError);

//reset global error handler
setGlobalErrorHandler(loggingErrorHandler());
});
});

describe('flushing spans with exporter triggering instrumentation', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
*/

import * as assert from 'assert';
import * as sinon from 'sinon';
import {
Span,
BasicTracerProvider,
InMemorySpanExporter,
SimpleSpanProcessor,
} from '../../src';
import {
ExportResult,
setGlobalErrorHandler,
loggingErrorHandler,
} from '@opentelemetry/core';
import { SpanContext, SpanKind, TraceFlags, context } from '@opentelemetry/api';
import { TestTracingSpanExporter } from './TestTracingSpanExporter';
import { TestStackContextManager } from './TestStackContextManager';
Expand Down Expand Up @@ -82,6 +88,51 @@ describe('SimpleSpanProcessor', () => {
await processor.shutdown();
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

it('should call globalErrorHandler when exporting fails', async () => {
const expectedError = new Error(
'SimpleSpanProcessor: span export failed (status 1)'
);
const processor = new SimpleSpanProcessor(exporter);
const spanContext: SpanContext = {
traceId: 'a3cda95b652f4a1592b449d5929fda1b',
spanId: '5e0c63257de34c92',
traceFlags: TraceFlags.NONE,
};
const span = new Span(
provider.getTracer('default'),
'span-name',
spanContext,
SpanKind.CLIENT
);

sinon.stub(exporter, 'export').callsFake((_, callback) => {
setTimeout(() => {
callback(ExportResult.FAILED_NOT_RETRYABLE);
}, 0);
});

const errorHandlerSpy = sinon.spy();

setGlobalErrorHandler(errorHandlerSpy);

processor.onEnd(span);

await new Promise(resolve => {
setTimeout(() => {
resolve();
}, 0);
});

assert.strictEqual(errorHandlerSpy.callCount, 1);

const [[error]] = errorHandlerSpy.args;

assert.deepStrictEqual(error, expectedError);

//reset global error handler
setGlobalErrorHandler(loggingErrorHandler());
});
});

describe('force flush', () => {
Expand Down

0 comments on commit b523dab

Please sign in to comment.