Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: request abort signal #3209

Merged
merged 4 commits into from
May 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fixup
ronag committed May 7, 2024
commit 1f28af4309465b796e14e4c749bb03b636a6b188
15 changes: 10 additions & 5 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

const assert = require('node:assert')
const { Readable } = require('./readable')
const { InvalidArgumentError, AbortError } = require('../core/errors')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
@@ -57,6 +57,7 @@ class RequestHandler extends AsyncResource {
this.highWaterMark = highWaterMark
this.signal = signal
this.reason = null
this.removeAbortListener = null

if (util.isStream(body)) {
body.on('error', (err) => {
@@ -66,10 +67,13 @@ class RequestHandler extends AsyncResource {

if (this.signal) {
if (this.signal.aborted) {
this.reason = this.signal.reason ?? new AbortError()
this.reason = this.signal.reason ?? new RequestAbortedError()
} else {
this.removeAbortListener = util.addAbortListener(this.signal, () => {
this.reason = this.signal.reason ?? new AbortError()
this.removeAbortListener?.()
this.removeAbortListener = null

this.reason = this.signal.reason ?? new RequestAbortedError()
if (this.res) {
util.destroy(this.res, this.reason)
} else if (this.abort) {
@@ -152,6 +156,9 @@ class RequestHandler extends AsyncResource {
onError (err) {
const { res, callback, body, opaque } = this

this.removeAbortListener?.()
this.removeAbortListener = null

if (callback) {
// TODO: Does this need queueMicrotask?
this.callback = null
@@ -166,8 +173,6 @@ class RequestHandler extends AsyncResource {
queueMicrotask(() => {
util.destroy(res, err)
})
} else if (this.removeAbortListener) {
this.removeAbortListener()
}

if (body) {
78 changes: 39 additions & 39 deletions test/client-request.js
Original file line number Diff line number Diff line change
@@ -135,45 +135,45 @@ test('request hwm', async (t) => {
await t.completed
})

// test('request abort before headers', async (t) => {
// t = tspl(t, { plan: 6 })

// const signal = new EE()
// const server = createServer((req, res) => {
// res.end('hello')
// signal.emit('abort')
// })
// after(() => server.close())

// server.listen(0, () => {
// const client = new Client(`http://localhost:${server.address().port}`)
// after(() => client.destroy())

// client[kConnect](() => {
// client.request({
// path: '/',
// method: 'GET',
// signal
// }, (err) => {
// t.ok(err instanceof errors.RequestAbortedError)
// t.strictEqual(signal.listenerCount('abort'), 0)
// })
// t.strictEqual(signal.listenerCount('abort'), 1)

// client.request({
// path: '/',
// method: 'GET',
// signal
// }, (err) => {
// t.ok(err instanceof errors.RequestAbortedError)
// t.strictEqual(signal.listenerCount('abort'), 0)
// })
// t.strictEqual(signal.listenerCount('abort'), 2)
// })
// })

// await t.completed
// })
test('request abort before headers', async (t) => {
t = tspl(t, { plan: 6 })

const signal = new EE()
const server = createServer((req, res) => {
res.end('hello')
signal.emit('abort')
})
after(() => server.close())

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
after(() => client.destroy())

client[kConnect](() => {
client.request({
path: '/',
method: 'GET',
signal
}, (err) => {
t.ok(err instanceof errors.RequestAbortedError)
t.strictEqual(signal.listenerCount('abort'), 0)
})
t.strictEqual(signal.listenerCount('abort'), 1)

client.request({
path: '/',
method: 'GET',
signal
}, (err) => {
t.ok(err instanceof errors.RequestAbortedError)
t.strictEqual(signal.listenerCount('abort'), 0)
})
t.strictEqual(signal.listenerCount('abort'), 2)
})
})

await t.completed
})

test('request body destroyed on invalid callback', async (t) => {
t = tspl(t, { plan: 1 })
280 changes: 140 additions & 140 deletions test/pool.js
Original file line number Diff line number Diff line change
@@ -781,146 +781,146 @@ test('pool dispatch error', async (t) => {
await t.completed
})

// test('pool request abort in queue', async (t) => {
// t = tspl(t, { plan: 3 })

// const server = createServer((req, res) => {
// res.end('asd')
// })
// after(() => server.close())

// server.listen(0, async () => {
// const client = new Pool(`http://localhost:${server.address().port}`, {
// connections: 1,
// pipelining: 1
// })
// after(() => client.close())

// client.dispatch({
// path: '/',
// method: 'GET'
// }, {
// onConnect () {
// },
// onHeaders (statusCode, headers) {
// t.strictEqual(statusCode, 200)
// },
// onData (chunk) {
// },
// onComplete () {
// t.ok(true, 'pass')
// },
// onError () {
// }
// })

// const signal = new EventEmitter()
// client.request({
// path: '/',
// method: 'GET',
// signal
// }, (err) => {
// t.strictEqual(err.code, 'UND_ERR_ABORTED')
// })
// signal.emit('abort')
// })

// await t.completed
// })

// test('pool stream abort in queue', async (t) => {
// t = tspl(t, { plan: 3 })

// const server = createServer((req, res) => {
// res.end('asd')
// })
// after(() => server.close())

// server.listen(0, async () => {
// const client = new Pool(`http://localhost:${server.address().port}`, {
// connections: 1,
// pipelining: 1
// })
// after(() => client.close())

// client.dispatch({
// path: '/',
// method: 'GET'
// }, {
// onConnect () {
// },
// onHeaders (statusCode, headers) {
// t.strictEqual(statusCode, 200)
// },
// onData (chunk) {
// },
// onComplete () {
// t.ok(true, 'pass')
// },
// onError () {
// }
// })

// const signal = new EventEmitter()
// client.stream({
// path: '/',
// method: 'GET',
// signal
// }, ({ body }) => body, (err) => {
// t.strictEqual(err.code, 'UND_ERR_ABORTED')
// })
// signal.emit('abort')
// })

// await t.completed
// })

// test('pool pipeline abort in queue', async (t) => {
// t = tspl(t, { plan: 3 })

// const server = createServer((req, res) => {
// res.end('asd')
// })
// after(() => server.close())

// server.listen(0, async () => {
// const client = new Pool(`http://localhost:${server.address().port}`, {
// connections: 1,
// pipelining: 1
// })
// after(() => client.close())

// client.dispatch({
// path: '/',
// method: 'GET'
// }, {
// onConnect () {
// },
// onHeaders (statusCode, headers) {
// t.strictEqual(statusCode, 200)
// },
// onData (chunk) {
// },
// onComplete () {
// t.ok(true, 'pass')
// },
// onError () {
// }
// })

// const signal = new EventEmitter()
// client.pipeline({
// path: '/',
// method: 'GET',
// signal
// }, ({ body }) => body).end().on('error', (err) => {
// t.strictEqual(err.code, 'UND_ERR_ABORTED')
// })
// signal.emit('abort')
// })

// await t.completed
// })
test('pool request abort in queue', async (t) => {
t = tspl(t, { plan: 3 })

const server = createServer((req, res) => {
res.end('asd')
})
after(() => server.close())

server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
after(() => client.close())

client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.ok(true, 'pass')
},
onError () {
}
})

const signal = new EventEmitter()
client.request({
path: '/',
method: 'GET',
signal
}, (err) => {
t.strictEqual(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})

await t.completed
})

test('pool stream abort in queue', async (t) => {
t = tspl(t, { plan: 3 })

const server = createServer((req, res) => {
res.end('asd')
})
after(() => server.close())

server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
after(() => client.close())

client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.ok(true, 'pass')
},
onError () {
}
})

const signal = new EventEmitter()
client.stream({
path: '/',
method: 'GET',
signal
}, ({ body }) => body, (err) => {
t.strictEqual(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})

await t.completed
})

test('pool pipeline abort in queue', async (t) => {
t = tspl(t, { plan: 3 })

const server = createServer((req, res) => {
res.end('asd')
})
after(() => server.close())

server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
after(() => client.close())

client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.ok(true, 'pass')
},
onError () {
}
})

const signal = new EventEmitter()
client.pipeline({
path: '/',
method: 'GET',
signal
}, ({ body }) => body).end().on('error', (err) => {
t.strictEqual(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})

await t.completed
})

test('pool stream constructor error destroy body', async (t) => {
t = tspl(t, { plan: 4 })
318 changes: 159 additions & 159 deletions test/request-timeout.js
Original file line number Diff line number Diff line change
@@ -178,165 +178,165 @@ test('overridden body timeout', async (t) => {
await t.completed
})

// test('With EE signal', async (t) => {
// t = tspl(t, { plan: 1 })

// const clock = FakeTimers.install({
// shouldClearNativeTimers: true,
// toFake: ['setTimeout', 'clearTimeout']
// })
// after(() => clock.uninstall())

// const orgTimers = { ...timers }
// Object.assign(timers, { setTimeout, clearTimeout })
// after(() => {
// Object.assign(timers, orgTimers)
// })

// const server = createServer((req, res) => {
// setTimeout(() => {
// res.end('hello')
// }, 100)
// clock.tick(100)
// })
// after(() => server.close())

// server.listen(0, () => {
// const client = new Client(`http://localhost:${server.address().port}`, {
// headersTimeout: 50
// })
// const ee = new EventEmitter()
// after(() => client.destroy())

// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
// t.ok(err instanceof errors.HeadersTimeoutError)
// })

// clock.tick(50)
// })

// await t.completed
// })

// test('With abort-controller signal', async (t) => {
// t = tspl(t, { plan: 1 })

// const clock = FakeTimers.install({
// shouldClearNativeTimers: true,
// toFake: ['setTimeout', 'clearTimeout']
// })
// after(() => clock.uninstall())

// const orgTimers = { ...timers }
// Object.assign(timers, { setTimeout, clearTimeout })
// after(() => {
// Object.assign(timers, orgTimers)
// })

// const server = createServer((req, res) => {
// setTimeout(() => {
// res.end('hello')
// }, 100)
// clock.tick(100)
// })
// after(() => server.close())

// server.listen(0, () => {
// const client = new Client(`http://localhost:${server.address().port}`, {
// headersTimeout: 50
// })
// const abortController = new AbortController()
// after(() => client.destroy())

// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => {
// t.ok(err instanceof errors.HeadersTimeoutError)
// })

// clock.tick(50)
// })

// await t.completed
// })

// test('Abort before timeout (EE)', async (t) => {
// t = tspl(t, { plan: 1 })

// const clock = FakeTimers.install({
// shouldClearNativeTimers: true,
// toFake: ['setTimeout', 'clearTimeout']
// })
// after(() => clock.uninstall())

// const orgTimers = { ...timers }
// Object.assign(timers, { setTimeout, clearTimeout })
// after(() => {
// Object.assign(timers, orgTimers)
// })

// const ee = new EventEmitter()
// const server = createServer((req, res) => {
// setTimeout(() => {
// res.end('hello')
// }, 100)
// ee.emit('abort')
// clock.tick(50)
// })
// after(() => server.close())

// server.listen(0, () => {
// const client = new Client(`http://localhost:${server.address().port}`, {
// headersTimeout: 50
// })
// after(() => client.destroy())

// client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
// t.ok(err instanceof errors.RequestAbortedError)
// clock.tick(100)
// })
// })

// await t.completed
// })

// test('Abort before timeout (abort-controller)', async (t) => {
// t = tspl(t, { plan: 1 })

// const clock = FakeTimers.install({
// shouldClearNativeTimers: true,
// toFake: ['setTimeout', 'clearTimeout']
// })
// after(() => clock.uninstall())

// const orgTimers = { ...timers }
// Object.assign(timers, { setTimeout, clearTimeout })
// after(() => {
// Object.assign(timers, orgTimers)
// })

// const abortController = new AbortController()
// const server = createServer((req, res) => {
// setTimeout(() => {
// res.end('hello')
// }, 100)
// abortController.abort()
// clock.tick(50)
// })
// after(() => server.close())

// server.listen(0, () => {
// const client = new Client(`http://localhost:${server.address().port}`, {
// headersTimeout: 50
// })
// after(() => client.destroy())

// client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => {
// t.ok(err instanceof errors.RequestAbortedError)
// clock.tick(100)
// })
// })

// await t.completed
// })
test('With EE signal', async (t) => {
t = tspl(t, { plan: 1 })

const clock = FakeTimers.install({
shouldClearNativeTimers: true,
toFake: ['setTimeout', 'clearTimeout']
})
after(() => clock.uninstall())

const orgTimers = { ...timers }
Object.assign(timers, { setTimeout, clearTimeout })
after(() => {
Object.assign(timers, orgTimers)
})

const server = createServer((req, res) => {
setTimeout(() => {
res.end('hello')
}, 100)
clock.tick(100)
})
after(() => server.close())

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
headersTimeout: 50
})
const ee = new EventEmitter()
after(() => client.destroy())

client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.ok(err instanceof errors.HeadersTimeoutError)
})

clock.tick(50)
})

await t.completed
})

test('With abort-controller signal', async (t) => {
t = tspl(t, { plan: 1 })

const clock = FakeTimers.install({
shouldClearNativeTimers: true,
toFake: ['setTimeout', 'clearTimeout']
})
after(() => clock.uninstall())

const orgTimers = { ...timers }
Object.assign(timers, { setTimeout, clearTimeout })
after(() => {
Object.assign(timers, orgTimers)
})

const server = createServer((req, res) => {
setTimeout(() => {
res.end('hello')
}, 100)
clock.tick(100)
})
after(() => server.close())

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
headersTimeout: 50
})
const abortController = new AbortController()
after(() => client.destroy())

client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => {
t.ok(err instanceof errors.HeadersTimeoutError)
})

clock.tick(50)
})

await t.completed
})

test('Abort before timeout (EE)', async (t) => {
t = tspl(t, { plan: 1 })

const clock = FakeTimers.install({
shouldClearNativeTimers: true,
toFake: ['setTimeout', 'clearTimeout']
})
after(() => clock.uninstall())

const orgTimers = { ...timers }
Object.assign(timers, { setTimeout, clearTimeout })
after(() => {
Object.assign(timers, orgTimers)
})

const ee = new EventEmitter()
const server = createServer((req, res) => {
setTimeout(() => {
res.end('hello')
}, 100)
ee.emit('abort')
clock.tick(50)
})
after(() => server.close())

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
headersTimeout: 50
})
after(() => client.destroy())

client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.ok(err instanceof errors.RequestAbortedError)
clock.tick(100)
})
})

await t.completed
})

test('Abort before timeout (abort-controller)', async (t) => {
t = tspl(t, { plan: 1 })

const clock = FakeTimers.install({
shouldClearNativeTimers: true,
toFake: ['setTimeout', 'clearTimeout']
})
after(() => clock.uninstall())

const orgTimers = { ...timers }
Object.assign(timers, { setTimeout, clearTimeout })
after(() => {
Object.assign(timers, orgTimers)
})

const abortController = new AbortController()
const server = createServer((req, res) => {
setTimeout(() => {
res.end('hello')
}, 100)
abortController.abort()
clock.tick(50)
})
after(() => server.close())

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
headersTimeout: 50
})
after(() => client.destroy())

client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => {
t.ok(err instanceof errors.RequestAbortedError)
clock.tick(100)
})
})

await t.completed
})

test('Timeout with pipelining', async (t) => {
t = tspl(t, { plan: 3 })