Skip to content

Commit

Permalink
chore: adding force flush to span processors (open-telemetry#802)
Browse files Browse the repository at this point in the history
* chore: adding force flash to span processors

* chore: fixing merge

* chore: fixing merge

Co-authored-by: Mayur Kale <mayurkale@google.com>
  • Loading branch information
obecny and mayurkale22 authored Mar 3, 2020
1 parent cf93906 commit b4cfde3
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,5 @@ export class CollectorExporter implements SpanExporter {

// platform dependent
onShutdown(this.shutdown);

// @TODO get spans from span processor (batch)
this._exportSpans([])
.then(() => {
this.logger.debug('shutdown completed');
})
.catch(() => {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,6 @@ describe('CollectorExporter - common', () => {
onShutdownSpy.restore();
});

it('should export spans once only', done => {
collectorExporter.shutdown();
collectorExporter.shutdown();
collectorExporter.shutdown();

setTimeout(() => {
assert.strictEqual(onShutdownSpy.callCount, 1);
done();
});
});

it('should call onShutdown', done => {
collectorExporter.shutdown();
setTimeout(() => {
Expand Down
2 changes: 0 additions & 2 deletions packages/opentelemetry-exporter-zipkin/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ export interface ExporterConfig {
logger?: types.Logger;
serviceName: string;
url?: string;
// Initiates a request with spans in memory to the backend.
forceFlush?: boolean;
// Optional mapping overrides for OpenTelemetry status code and description.
statusCodeTagName?: string;
statusDescriptionTagName?: string;
Expand Down
7 changes: 0 additions & 7 deletions packages/opentelemetry-exporter-zipkin/src/zipkin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import { OT_REQUEST_HEADER } from './utils';
*/
export class ZipkinExporter implements SpanExporter {
static readonly DEFAULT_URL = 'http://localhost:9411/api/v2/spans';
private readonly _forceFlush: boolean;
private readonly _logger: types.Logger;
private readonly _serviceName: string;
private readonly _statusCodeTagName: string;
Expand All @@ -45,7 +44,6 @@ export class ZipkinExporter implements SpanExporter {
const urlStr = config.url || ZipkinExporter.DEFAULT_URL;
const urlOpts = url.parse(urlStr);

this._forceFlush = config.forceFlush || true;
this._logger = config.logger || new NoopLogger();
this._reqOpts = Object.assign(
{
Expand Down Expand Up @@ -88,11 +86,6 @@ export class ZipkinExporter implements SpanExporter {
return;
}
this._isShutdown = true;
// Make an optimistic flush.
if (this._forceFlush) {
// @todo get spans from span processor (batch)
this._sendSpans([]);
}
}

/**
Expand Down
16 changes: 0 additions & 16 deletions packages/opentelemetry-exporter-zipkin/test/zipkin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,6 @@ describe('ZipkinExporter', () => {
assert.ok(typeof exporter.export === 'function');
assert.ok(typeof exporter.shutdown === 'function');
});
it('should construct an exporter with forceFlush', () => {
const exporter = new ZipkinExporter({
serviceName: 'my-service',
forceFlush: false,
});
assert.ok(typeof exporter.export === 'function');
assert.ok(typeof exporter.shutdown === 'function');
});
it('should construct an exporter with statusCodeTagName', () => {
const exporter = new ZipkinExporter({
serviceName: 'my-service',
Expand Down Expand Up @@ -338,13 +330,5 @@ describe('ZipkinExporter', () => {

// @todo: implement
it('should send by default');
it('should not send with forceFlush=false', () => {
const exporter = new ZipkinExporter({
serviceName: 'my-service',
forceFlush: false,
});

exporter.shutdown();
});
});
});
4 changes: 4 additions & 0 deletions packages/opentelemetry-tracing/src/MultiSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import { SpanProcessor } from './SpanProcessor';
export class MultiSpanProcessor implements SpanProcessor {
constructor(private readonly _spanProcessors: SpanProcessor[]) {}

forceFlush(): void {
// do nothing as all spans are being exported without waiting
}

onStart(span: Span): void {
for (const spanProcessor of this._spanProcessors) {
spanProcessor.onStart(span);
Expand Down
1 change: 1 addition & 0 deletions packages/opentelemetry-tracing/src/NoopSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ export class NoopSpanProcessor implements SpanProcessor {
onStart(span: Span): void {}
onEnd(span: Span): void {}
shutdown(): void {}
forceFlush(): void {}
}
5 changes: 5 additions & 0 deletions packages/opentelemetry-tracing/src/SpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import { Span } from '@opentelemetry/api';
* for when a {@link Span} is started or when a {@link Span} is ended.
*/
export interface SpanProcessor {
/**
* Forces to export all finished spans
*/
forceFlush(): void;

/**
* Called when a {@link Span} is started, if the `span.isRecording()`
* returns true.
Expand Down
18 changes: 17 additions & 1 deletion packages/opentelemetry-tracing/src/export/BatchSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ export class BatchSpanProcessor implements SpanProcessor {
private _finishedSpans: ReadableSpan[] = [];
private _lastSpanFlush = Date.now();
private _timer: NodeJS.Timeout;
private _isShutdown = false;

constructor(private readonly _exporter: SpanExporter, config?: BufferConfig) {
this._bufferSize =
config && config.bufferSize ? config.bufferSize : DEFAULT_BUFFER_SIZE;
this._bufferTimeout =
config && config.bufferTimeout
config && typeof config.bufferTimeout === 'number'
? config.bufferTimeout
: DEFAULT_BUFFER_TIMEOUT_MS;

Expand All @@ -51,15 +52,30 @@ export class BatchSpanProcessor implements SpanProcessor {
unrefTimer(this._timer);
}

forceFlush(): void {
if (this._isShutdown) {
return;
}
this._flush();
}

// does nothing.
onStart(span: Span): void {}

onEnd(span: Span): void {
if (this._isShutdown) {
return;
}
this._addToBuffer(span.toReadableSpan());
}

shutdown(): void {
if (this._isShutdown) {
return;
}
clearInterval(this._timer);
this.forceFlush();
this._isShutdown = true;
this._exporter.shutdown();
}

Expand Down
13 changes: 13 additions & 0 deletions packages/opentelemetry-tracing/src/export/SimpleSpanProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,28 @@ import { SpanExporter } from './SpanExporter';
*/
export class SimpleSpanProcessor implements SpanProcessor {
constructor(private readonly _exporter: SpanExporter) {}
private _isShutdown = false;

forceFlush(): void {
// do nothing as all spans are being exported without waiting
}

// does nothing.
onStart(span: Span): void {}

onEnd(span: Span): void {
if (this._isShutdown) {
return;
}
this._exporter.export([span.toReadableSpan()], () => {});
}

shutdown(): void {
if (this._isShutdown) {
return;
}
this._isShutdown = true;

this._exporter.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TestProcessor implements SpanProcessor {
shutdown(): void {
this.spans = [];
}
forceFlush(): void {}
}

describe('MultiSpanProcessor', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ describe('BatchSpanProcessor', () => {
});

describe('.onStart/.onEnd/.shutdown', () => {
it('should do nothing after processor is shutdown', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
const spy: sinon.SinonSpy = sinon.spy(exporter, 'export') as any;

const span = createSampledSpan(`${name}_0`);

processor.onEnd(span);
assert.strictEqual(processor['_finishedSpans'].length, 1);

processor.forceFlush();
assert.strictEqual(exporter.getFinishedSpans().length, 1);

processor.onEnd(span);
assert.strictEqual(processor['_finishedSpans'].length, 1);

assert.strictEqual(spy.args.length, 1);
processor.shutdown();
assert.strictEqual(spy.args.length, 2);
assert.strictEqual(exporter.getFinishedSpans().length, 0);

processor.onEnd(span);
assert.strictEqual(spy.args.length, 2);
assert.strictEqual(processor['_finishedSpans'].length, 0);
assert.strictEqual(exporter.getFinishedSpans().length, 0);
});

it('should export the sampled spans with buffer size reached', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.bufferSize; i++) {
Expand Down Expand Up @@ -105,5 +131,16 @@ describe('BatchSpanProcessor', () => {

clock.restore();
});

it('should force flush on demand', () => {
const processor = new BatchSpanProcessor(exporter, defaultBufferConfig);
for (let i = 0; i < defaultBufferConfig.bufferSize; i++) {
const span = createSampledSpan(`${name}_${i}`);
processor.onEnd(span);
}
assert.strictEqual(exporter.getFinishedSpans().length, 0);
processor.forceFlush();
assert.strictEqual(exporter.getFinishedSpans().length, 5);
});
});
});

0 comments on commit b4cfde3

Please sign in to comment.