Skip to content

Commit

Permalink
fix: adjust async_hooks cls behavior (#734)
Browse files Browse the repository at this point in the history
PR-URL: #734
  • Loading branch information
kjin authored May 4, 2018
1 parent d8f3611 commit 79ab435
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 50 deletions.
121 changes: 92 additions & 29 deletions src/cls/async-hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

// This file requires continuation-local-storage in the AsyncHooksCLS
// constructor, rather than upon module load.
// This file calls require('async_hooks') in the AsyncHooksCLS constructor,
// rather than upon module load.
import * as asyncHooksModule from 'async_hooks';
import {EventEmitter} from 'events';
import * as shimmer from 'shimmer';
Expand All @@ -31,29 +31,63 @@ const EVENT_EMITTER_METHODS: Array<keyof EventEmitter> =
const WRAPPED = Symbol('@google-cloud/trace-agent:AsyncHooksCLS:WRAPPED');

type ContextWrapped<T> = T&{[WRAPPED]?: boolean};
type Reference<T> = {
value: T
};

/**
* An implementation of continuation-local storage on top of the async_hooks
* module.
*/
export class AsyncHooksCLS<Context extends {}> implements CLS<Context> {
private currentContext: {value: Context};
private contexts: {[id: number]: Context} = {};
// instance-scope reference to avoid top-level require.
private ah: AsyncHooksModule;

/** A map of AsyncResource IDs to Context objects. */
private contexts: {[id: number]: Reference<Context>} = {};
/** The AsyncHook that proactively populates entries in this.contexts. */
private hook: asyncHooksModule.AsyncHook;
/** Whether this instance is enabled. */
private enabled = false;

constructor(private readonly defaultContext: Context) {
this.currentContext = {value: this.defaultContext};
this.hook = (require('async_hooks') as AsyncHooksModule).createHook({
// Store a reference to the async_hooks module, since we will need to query
// the current AsyncResource ID often.
this.ah = require('async_hooks') as AsyncHooksModule;

// Create the hook.
this.hook = this.ah.createHook({
init: (id: number, type: string, triggerId: number, resource: {}) => {
this.contexts[id] = this.currentContext.value;
},
before: (id: number) => {
if (this.contexts[id]) {
this.currentContext.value = this.contexts[id];
// init is called when a new AsyncResource is created. We want code
// that runs within the scope of this new AsyncResource to see the same
// context as its "parent" AsyncResource. The criteria for the parent
// depends on the type of the AsyncResource.
if (type === 'PROMISE') {
// Opt not to use the trigger ID for Promises, as this causes context
// confusion in applications using async/await.
// Instead, use the ID of the AsyncResource in whose scope we are
// currently running.
this.contexts[id] = this.contexts[this.ah.executionAsyncId()];
} else {
// Use the trigger ID for any other type. In Node core, this is
// usually equal the ID of the AsyncResource in whose scope we are
// currently running (the "current" AsyncResource), or that of one
// of its ancestors, so the behavior is not expected to be different
// from using the ID of the current AsyncResource instead.
// A divergence is expected only to arise through the user
// AsyncResource API, because users of that API can specify their own
// trigger ID. In this case, we choose to respect the user's
// selection.
this.contexts[id] = this.contexts[triggerId];
}
// Note that this function always assigns values in this.contexts to
// values under other keys, which may or may not be undefined. Consumers
// of the CLS API will get the sentinel (default) value if they query
// the current context when it is stored as undefined.
},
destroy: (id: number) => {
// destroy is called when the AsyncResource is no longer used, so also
// delete its entry in the map.
delete this.contexts[id];
}
});
Expand All @@ -64,51 +98,85 @@ export class AsyncHooksCLS<Context extends {}> implements CLS<Context> {
}

enable(): void {
this.currentContext.value = this.defaultContext;
this.contexts = {};
this.hook.enable();
this.enabled = true;
}

disable(): void {
this.currentContext.value = this.defaultContext;
this.contexts = {};
this.hook.disable();
this.enabled = false;
}

getContext(): Context {
return this.currentContext.value;
// We don't store this.defaultContext directly in this.contexts.
// Getting undefined when looking up this.contexts means that it wasn't
// set, so return the default context.
const current = this.contexts[this.ah.executionAsyncId()];
return current ? current.value : this.defaultContext;
}

setContext(value: Context): void {
this.currentContext.value = value;
const id = this.ah.executionAsyncId();
const current = this.contexts[id];
if (current) {
current.value = value;
} else {
this.contexts[id] = {value};
}
}

runWithNewContext<T>(fn: Func<T>): T {
const oldContext = this.currentContext.value;
this.currentContext.value = this.defaultContext;
// Run fn() so that any AsyncResource objects that are created in
// fn will have the context set by this.setContext.
const id = this.ah.executionAsyncId();
const oldContext = this.contexts[id];
// Reset the current context. This prevents this.getContext from returning
// a stale value.
this.contexts[id] = {value: this.defaultContext};
try {
return fn();
} finally {
this.currentContext.value = oldContext;
// Revert the current context to what it was before any calls to
// this.setContext from within fn.
this.contexts[id] = oldContext;
}
}

bindWithCurrentContext<T>(fn: Func<T>): Func<T> {
if ((fn as ContextWrapped<Func<T>>)[WRAPPED] || !this.currentContext) {
// Return if we have already wrapped the function.
if ((fn as ContextWrapped<Func<T>>)[WRAPPED]) {
return fn;
}
const current = this.currentContext;
const boundContext = this.currentContext.value;
// Capture the context of the current AsyncResource.
const boundContext = this.contexts[this.ah.executionAsyncId()];
// Return if there is no current context to bind.
if (!boundContext) {
return fn;
}
const that = this;
// TODO(kjin): This code is somewhat duplicated with runWithNewContext.
// Can we merge this?
// Wrap fn so that any AsyncResource objects that are created in fn will
// share context with that of the AsyncResource with the given ID.
const contextWrapper: ContextWrapped<Func<T>> = function(this: {}) {
const oldContext = current.value;
current.value = boundContext;
const id = that.ah.executionAsyncId();
const oldContext = that.contexts[id];
// Restore the captured context.
that.contexts[id] = boundContext;
try {
return fn.apply(this, arguments) as T;
} finally {
current.value = oldContext;
// Revert the current context to what it was before it was set to the
// captured context.
that.contexts[id] = oldContext;
}
};
// Prevent re-wrapping.
contextWrapper[WRAPPED] = true;
// Explicitly inherit the original function's length, because it is
// otherwise zero-ed out.
Object.defineProperty(contextWrapper, 'length', {
enumerable: false,
configurable: true,
Expand All @@ -118,11 +186,6 @@ export class AsyncHooksCLS<Context extends {}> implements CLS<Context> {
return contextWrapper;
}

// This function is not technically needed and all tests currently pass
// without it (after removing call sites). While it is not a complete
// solution, restoring correct context before running every request/response
// event handler reduces the number of situations in which userspace queuing
// will cause us to lose context.
patchEmitterToPropagateContext(ee: EventEmitter): void {
const that = this;
EVENT_EMITTER_METHODS.forEach((method) => {
Expand Down
3 changes: 3 additions & 0 deletions src/cls/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export interface CLS<Context extends {}> {

/**
* Runs the given function as the start of a new continuation.
* TODO(kjin): Merge this with setContext.
* @param fn The function to run synchronously.
* @returns The return result of running `fn`.
*/
Expand All @@ -82,6 +83,7 @@ export interface CLS<Context extends {}> {
* the CLS implementation's propagating mechanism doesn't automatically do so.
* If not called from within a continuation, behavior is implementation-
* defined.
* TODO(kjin): Determine a more accurate name for this function.
* @param fn The function to bind.
* @returns A wrapped version of the given function with the same signature.
*/
Expand All @@ -91,6 +93,7 @@ export interface CLS<Context extends {}> {
* Patches an EventEmitter to lazily bind all future event listeners on this
* instance so that they belong in the same continuation as the execution
* path in which they were attached to the EventEmitter object.
* TODO(kjin): Determine a more accurate name for this function.
* @param ee The EventEmitter to bind. This instance will be mutated.
*/
patchEmitterToPropagateContext(ee: EventEmitter): void;
Expand Down
122 changes: 122 additions & 0 deletions test/test-cls-ah.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/**
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import * as asyncHooksModule from 'async_hooks';
import {IContextDefinition} from 'mocha';
import * as semver from 'semver';

import {AsyncHooksCLS} from '../src/cls/async-hooks';

type AsyncHooksModule = typeof asyncHooksModule;

const TEST_ASYNC_RESOURCE = '@google-cloud/trace-agent:test';
const maybeSkip = (describe: IContextDefinition) =>
semver.satisfies(process.version, '>=8.1') ? describe : describe.skip;

maybeSkip(describe)('AsyncHooks-based CLS', () => {
let asyncHooks: AsyncHooksModule;
// tslint:disable-next-line:variable-name
let AsyncResource: typeof asyncHooksModule.AsyncResource;
let cls: AsyncHooksCLS<string>;

before(() => {
asyncHooks = require('async_hooks') as AsyncHooksModule;
AsyncResource = class extends asyncHooks.AsyncResource {
// tslint:disable:no-any
runInAsyncScope<This, Result>(
fn: (this: This, ...args: any[]) => Result, thisArg?: This): Result {
// tslint:enable:no-any
// Polyfill for versions in which runInAsyncScope isn't defined
if (super.runInAsyncScope) {
return super.runInAsyncScope.apply(this, arguments);
} else {
this.emitBefore();
try {
return fn.apply(
thisArg, Array.prototype.slice.apply(arguments).slice(2));
} finally {
this.emitAfter();
}
}
}
};
});

beforeEach(() => {
cls = new AsyncHooksCLS('default');
cls.enable();
});

it('Correctly assumes the type of Promise resources', () => {
const actual: Array<Promise<void>> = [];
const expected: Array<Promise<void>> = [];
const hook = asyncHooks
.createHook({
init:
(uid: number, type: string, tid: number,
resource: {promise: Promise<void>}) => {
if (type === 'PROMISE') {
actual.push(resource.promise);
}
}
})
.enable();
expected.push(Promise.resolve());
expected.push(actual[0].then(() => {}));
assert.deepStrictEqual(actual, expected);
hook.disable();
});

it('Supports basic context propagation across async-await boundaries', () => {
return cls.runWithNewContext(async () => {
cls.setContext('modified');
await Promise.resolve();
assert.strictEqual(cls.getContext(), 'modified');
await Promise.resolve();
assert.strictEqual(cls.getContext(), 'modified');
});
});

describe('Using AsyncResource API', () => {
it('Supports context propagation without trigger ID', async () => {
let res!: asyncHooksModule.AsyncResource;
await cls.runWithNewContext(async () => {
res = new AsyncResource(TEST_ASYNC_RESOURCE);
cls.setContext('modified');
});
res.runInAsyncScope(() => {
assert.strictEqual(cls.getContext(), 'modified');
});
});

it('Supports context propagation with trigger ID', async () => {
let triggerId!: number;
let res!: asyncHooksModule.AsyncResource;
await cls.runWithNewContext(async () => {
triggerId = new AsyncResource(TEST_ASYNC_RESOURCE).asyncId();
cls.setContext('correct');
});
await cls.runWithNewContext(async () => {
res = new AsyncResource(TEST_ASYNC_RESOURCE, triggerId);
cls.setContext('incorrect');
});
res.runInAsyncScope(() => {
assert.strictEqual(cls.getContext(), 'correct');
});
});
});
});
32 changes: 11 additions & 21 deletions test/test-cls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,16 @@ describe('Continuation-Local Storage', () => {
});

describe('Implementations', () => {
const testCases: Array<{clazz: CLSConstructor, testAsyncAwait: boolean}> =
asyncAwaitSupported ?
[
{clazz: AsyncHooksCLS, testAsyncAwait: true},
{clazz: AsyncListenerCLS, testAsyncAwait: false}
] :
[{clazz: AsyncListenerCLS, testAsyncAwait: false}];
const testCases: CLSConstructor[] = asyncAwaitSupported ?
[AsyncHooksCLS, AsyncListenerCLS] :
[AsyncListenerCLS];

for (const testCase of testCases) {
describe(`CLS for class ${testCase.clazz.name}`, () => {
const maybeSkip = (it: ITestDefinition) =>
testCase.testAsyncAwait ? it : it.skip;
describe(`CLS for class ${testCase.name}`, () => {
let c!: CLS<string>;

beforeEach(() => {
c = new testCase.clazz('default');
c = new testCase('default');
c.enable();
});

Expand Down Expand Up @@ -157,6 +151,12 @@ describe('Continuation-Local Storage', () => {
runLater();
assert.strictEqual(c.getContext(), 'default');
});
c.runWithNewContext(() => {
c.setContext('modified-but-different');
// bind it again
runLater = c.bindWithCurrentContext(runLater);
});
runLater();
});

it('Corrects context when function run with new context throws', () => {
Expand Down Expand Up @@ -234,16 +234,6 @@ describe('Continuation-Local Storage', () => {
});
});
});

maybeSkip(it)(
'Supports basic context propagation across await boundaries',
() => {
return c.runWithNewContext(async () => {
c.setContext('modified');
await Promise.resolve();
assert.strictEqual(c.getContext(), 'modified');
});
});
});
}
});
Expand Down
Loading

0 comments on commit 79ab435

Please sign in to comment.