Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeouts rework (part 2) #168

Merged
merged 12 commits into from
Jun 22, 2023
2 changes: 1 addition & 1 deletion coverage/badge.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
56 changes: 25 additions & 31 deletions src/connection/adapter/base_http_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { getAsText, isStream } from '../../utils'
import type { ClickHouseSettings } from '../../settings'
import { getUserAgent } from '../../utils/user_agent'
import * as uuid from 'uuid'
import type * as net from 'net'

export interface RequestParams {
method: 'GET' | 'POST'
Expand Down Expand Up @@ -106,29 +107,18 @@ export abstract class BaseHttpAdapter implements Connection {

protected abstract createClientRequest(
params: RequestParams,
abort_signal: AbortSignal
abort_signal?: AbortSignal
): Http.ClientRequest

protected async request(params: RequestParams): Promise<Stream.Readable> {
return new Promise((resolve, reject) => {
const start = Date.now()

const abortController = new AbortController()
let isTimedOut = false
const timeout = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, this.config.request_timeout).unref()

const request = this.createClientRequest(params, abortController.signal)
const request = this.createClientRequest(params, params.abort_signal)

function onError(err: Error): void {
removeRequestListeners()
if (isTimedOut) {
reject(new Error('Request timed out'))
} else {
reject(err)
}
reject(err)
}

const onResponse = async (
Expand Down Expand Up @@ -159,11 +149,7 @@ export abstract class BaseHttpAdapter implements Connection {
* see the full sequence of events https://nodejs.org/api/http.html#httprequesturl-options-callback
* */
})
if (isTimedOut) {
reject(new Error('Timeout error'))
} else {
reject(new Error('The request was aborted.'))
}
reject(new Error('The request was aborted.'))
}

function onClose(): void {
Expand All @@ -173,35 +159,43 @@ export abstract class BaseHttpAdapter implements Connection {
removeRequestListeners()
}

function onUserAbortSignal(): void {
abortController.abort()
const config = this.config
function onSocket(socket: net.Socket): void {
// Force KeepAlive usage (workaround due to Node.js bug)
// https://github.com/nodejs/node/issues/47137#issuecomment-1477075229
socket.setKeepAlive(true, 1000)
socket.setTimeout(config.request_timeout, onTimeout)
}

function onTimeout(): void {
removeRequestListeners()
request.destroy()
reject(new Error('Timeout error'))
}

function removeRequestListeners(): void {
clearTimeout(timeout)
if (request.socket !== null) {
request.socket.setTimeout(0) // reset previously set timeout
request.socket.removeListener('timeout', onTimeout)
}
request.removeListener('socket', onSocket)
request.removeListener('response', onResponse)
request.removeListener('error', onError)
request.removeListener('close', onClose)
if (params.abort_signal !== undefined) {
params.abort_signal.removeEventListener('abort', onUserAbortSignal)
request.removeListener('abort', onAbort)
}
abortController.signal.removeEventListener('abort', onAbort)
}

request.on('socket', onSocket)
request.on('response', onResponse)
request.on('error', onError)
request.on('close', onClose)

if (params.abort_signal !== undefined) {
params.abort_signal.addEventListener('abort', onUserAbortSignal, {
once: true,
})
params.abort_signal.addEventListener('abort', onAbort, { once: true })
}

abortController.signal.addEventListener('abort', onAbort, {
once: true,
})

if (!params.body) return request.end()

const bodyStream = isStream(params.body)
Expand Down
2 changes: 1 addition & 1 deletion src/connection/adapter/http_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class HttpAdapter extends BaseHttpAdapter implements Connection {

protected createClientRequest(
params: RequestParams,
abort_signal: AbortSignal
abort_signal?: AbortSignal
): Http.ClientRequest {
return Http.request(params.url, {
method: params.method,
Expand Down
2 changes: 1 addition & 1 deletion src/connection/adapter/https_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class HttpsAdapter extends BaseHttpAdapter implements Connection {

protected createClientRequest(
params: RequestParams,
abort_signal: AbortSignal
abort_signal?: AbortSignal
): Http.ClientRequest {
return Https.request(params.url, {
method: params.method,
Expand Down