diff --git a/lib/_http_agent.js b/lib/_http_agent.js index eb98f2b0bd1ca4..12f8529c38097c 100644 --- a/lib/_http_agent.js +++ b/lib/_http_agent.js @@ -40,6 +40,13 @@ const { async_id_symbol } = require('internal/async_hooks').symbols; // ClientRequest.onSocket(). The Agent is now *strictly* // concerned with managing a connection pool. +class ReusedHandle { + constructor(type, handle) { + this.type = type; + this.handle = handle; + } +} + function Agent(options) { if (!(this instanceof Agent)) return new Agent(options); @@ -166,10 +173,11 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */, // We have a free socket, so use that. var socket = this.freeSockets[name].shift(); // Guard against an uninitialized or user supplied Socket. - if (socket._handle && typeof socket._handle.asyncReset === 'function') { + const handle = socket._handle; + if (handle && typeof handle.asyncReset === 'function') { // Assign the handle a new asyncId and run any destroy()/init() hooks. - socket._handle.asyncReset(); - socket[async_id_symbol] = socket._handle.getAsyncId(); + handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle)); + socket[async_id_symbol] = handle.getAsyncId(); } // don't leak diff --git a/src/async_wrap.cc b/src/async_wrap.cc index 8cb30a6f6a54fc..e11ce076819c6c 100644 --- a/src/async_wrap.cc +++ b/src/async_wrap.cc @@ -410,13 +410,26 @@ void AsyncWrap::PopAsyncIds(const FunctionCallbackInfo& args) { void AsyncWrap::AsyncReset(const FunctionCallbackInfo& args) { + CHECK(args[0]->IsObject()); + AsyncWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + Local resource = args[0].As(); double execution_async_id = - args[0]->IsNumber() ? args[0].As()->Value() : kInvalidAsyncId; - wrap->AsyncReset(execution_async_id); + args[1]->IsNumber() ? args[1].As()->Value() : kInvalidAsyncId; + wrap->AsyncReset(resource, execution_async_id); } + +void AsyncWrap::GetProviderType(const FunctionCallbackInfo& args) { + AsyncWrap* wrap; + args.GetReturnValue().Set(AsyncWrap::PROVIDER_NONE); + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + args.GetReturnValue().Set(wrap->provider_type()); +} + + void AsyncWrap::EmitDestroy() { AsyncWrap::EmitDestroy(env(), async_id_); // Ensure no double destroy is emitted via AsyncReset(). @@ -437,6 +450,7 @@ Local AsyncWrap::GetConstructorTemplate(Environment* env) { tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "AsyncWrap")); env->SetProtoMethod(tmpl, "getAsyncId", AsyncWrap::GetAsyncId); env->SetProtoMethod(tmpl, "asyncReset", AsyncWrap::AsyncReset); + env->SetProtoMethod(tmpl, "getProviderType", AsyncWrap::GetProviderType); env->set_async_wrap_ctor_template(tmpl); } return tmpl; diff --git a/src/async_wrap.h b/src/async_wrap.h index 3a8789f89bae78..0fe41352234d06 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -133,6 +133,7 @@ class AsyncWrap : public BaseObject { static void PushAsyncIds(const v8::FunctionCallbackInfo& args); static void PopAsyncIds(const v8::FunctionCallbackInfo& args); static void AsyncReset(const v8::FunctionCallbackInfo& args); + static void GetProviderType(const v8::FunctionCallbackInfo& args); static void QueueDestroyAsyncId( const v8::FunctionCallbackInfo& args); diff --git a/test/async-hooks/init-hooks.js b/test/async-hooks/init-hooks.js index 817b2db0c781b4..b10ec827198f6d 100644 --- a/test/async-hooks/init-hooks.js +++ b/test/async-hooks/init-hooks.js @@ -158,7 +158,7 @@ class ActivityCollector { // events this makes sense for a few tests in which we enable some hooks // later if (this._allowNoInit) { - const stub = { uid, type: 'Unknown', handleIsObject: true }; + const stub = { uid, type: 'Unknown', handleIsObject: true, handle: {} }; this._activities.set(uid, stub); return stub; } else if (!common.isMainThread) { @@ -184,7 +184,8 @@ class ActivityCollector { triggerAsyncId, // In some cases (e.g. Timeout) the handle is a function, thus the usual // `typeof handle === 'object' && handle !== null` check can't be used. - handleIsObject: handle instanceof Object + handleIsObject: handle instanceof Object, + handle }; this._stamp(activity, 'init'); this._activities.set(uid, activity); diff --git a/test/async-hooks/test-http-agent-handle-reuse.js b/test/async-hooks/test-http-agent-handle-reuse.js new file mode 100644 index 00000000000000..4db83bec54a7bf --- /dev/null +++ b/test/async-hooks/test-http-agent-handle-reuse.js @@ -0,0 +1,110 @@ +'use strict'; +// Flags: --expose-internals +const common = require('../common'); +const initHooks = require('./init-hooks'); +const { checkInvocations } = require('./hook-checks'); +const assert = require('assert'); +const { async_id_symbol } = require('internal/async_hooks').symbols; +const http = require('http'); + +// Checks that the async resource used in init in case of a resused handle +// is not reused. Test is based on parallel\test-async-hooks-http-agent.js. + +const hooks = initHooks(); +hooks.enable(); + +let asyncIdAtFirstReq; +let asyncIdAtSecondReq; + +// Make sure a single socket is transparently reused for 2 requests. +const agent = new http.Agent({ + keepAlive: true, + keepAliveMsecs: Infinity, + maxSockets: 1 +}); + +const server = http.createServer(common.mustCall((req, res) => { + req.once('data', common.mustCallAtLeast(() => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.write('foo'); + })); + req.on('end', common.mustCall(() => { + res.end('bar'); + })); +}, 2)).listen(0, common.mustCall(() => { + const port = server.address().port; + const payload = 'hello world'; + + // First request. This is useless except for adding a socket to the + // agent’s pool for reuse. + const r1 = http.request({ + agent, port, method: 'POST' + }, common.mustCall((res) => { + // Remember which socket we used. + const socket = res.socket; + asyncIdAtFirstReq = socket[async_id_symbol]; + assert.ok(asyncIdAtFirstReq > 0, `${asyncIdAtFirstReq} > 0`); + // Check that request and response share their socket. + assert.strictEqual(r1.socket, socket); + + res.on('data', common.mustCallAtLeast(() => {})); + res.on('end', common.mustCall(() => { + // setImmediate() to give the agent time to register the freed socket. + setImmediate(common.mustCall(() => { + // The socket is free for reuse now. + assert.strictEqual(socket[async_id_symbol], -1); + + // Second request. To re-create the exact conditions from the + // referenced issue, we use a POST request without chunked encoding + // (hence the Content-Length header) and call .end() after the + // response header has already been received. + const r2 = http.request({ + agent, port, method: 'POST', headers: { + 'Content-Length': payload.length + } + }, common.mustCall((res) => { + asyncIdAtSecondReq = res.socket[async_id_symbol]; + assert.ok(asyncIdAtSecondReq > 0, `${asyncIdAtSecondReq} > 0`); + assert.strictEqual(r2.socket, socket); + + // Empty payload, to hit the “right” code path. + r2.end(''); + + res.on('data', common.mustCallAtLeast(() => {})); + res.on('end', common.mustCall(() => { + // Clean up to let the event loop stop. + server.close(); + agent.destroy(); + })); + })); + + // Schedule a payload to be written immediately, but do not end the + // request just yet. + r2.write(payload); + })); + })); + })); + r1.end(payload); +})); + + +process.on('exit', onExit); + +function onExit() { + hooks.disable(); + hooks.sanityCheck(); + const activities = hooks.activities; + + // Verify both invocations + const first = activities.filter((x) => x.uid === asyncIdAtFirstReq)[0]; + checkInvocations(first, { init: 1, destroy: 1 }, 'when process exits'); + + const second = activities.filter((x) => x.uid === asyncIdAtSecondReq)[0]; + checkInvocations(second, { init: 1, destroy: 1 }, 'when process exits'); + + // Verify reuse handle has been wrapped + assert.strictEqual(first.type, second.type); + assert.ok(first.handle !== second.handle, 'Resource reused'); + assert.ok(first.handle === second.handle.handle, + 'Resource not wrapped correctly'); +}