Skip to content

Commit

Permalink
fix: Try to fix reconnection error by always force-closing requests, …
Browse files Browse the repository at this point in the history
…and locking reconnection process
  • Loading branch information
alkihis committed Oct 22, 2021
1 parent 568fea8 commit d73426e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/client-mixins/request-maker.mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export interface IGetHttpRequestArgs {
forceBodyMode?: TBodyMode;
enableAuth?: boolean;
enableRateLimitSave?: boolean;
timeout?: number;
}

export interface IGetStreamRequestArgs {
Expand Down Expand Up @@ -77,7 +78,7 @@ export abstract class ClientRequestMaker {
/** Send a new request and returns a wrapped `Promise<TwitterResponse<T>`. */
send<T = any>(requestParams: IGetHttpRequestArgs) : Promise<TwitterResponse<T>> {
const args = this.getHttpRequestArgs(requestParams);
const options = { method: args.method, headers: args.headers };
const options = { method: args.method, headers: args.headers, timeout: requestParams.timeout };
const enableRateLimitSave = requestParams.enableRateLimitSave !== false;

if (args.body) {
Expand Down
50 changes: 32 additions & 18 deletions src/stream/TweetStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ export interface IConnectTweetStreamParams {
/** Returns a number of milliseconds to wait for {tryOccurence} (starting from 1) */
export type TStreamConnectRetryFn = (tryOccurence: number) => number;

const basicReconnectRetry: TStreamConnectRetryFn = tryOccurence => Math.min((tryOccurence ** 2) * 1000, 25000);
// In seconds
const basicRetriesAttempt = [5, 15, 30, 60, 90, 120];
// Default retry function
const basicReconnectRetry: TStreamConnectRetryFn =
tryOccurence => tryOccurence > basicRetriesAttempt.length
? 120000
: basicRetriesAttempt[tryOccurence - 1] * 1000;

export class TweetStream<T = any> extends EventEmitter {
public autoReconnect = false;
Expand All @@ -36,6 +42,7 @@ export class TweetStream<T = any> extends EventEmitter {
protected retryTimeout?: NodeJS.Timeout;
protected keepAliveTimeout?: NodeJS.Timeout;
protected parser = new TweetStreamParser();
protected connectionProcessRunning = false;

protected req?: ClientRequest;
protected res?: IncomingMessage;
Expand Down Expand Up @@ -178,8 +185,11 @@ export class TweetStream<T = any> extends EventEmitter {
}
if (this.req) {
this.req.removeAllListeners();
// Close connection silentely
this.req.destroy();

if (!this.req.destroyed) {
// Close connection silentely
this.req.destroy();
}
}
}

Expand Down Expand Up @@ -263,33 +273,37 @@ export class TweetStream<T = any> extends EventEmitter {

/** Make a new request to (re)connect to Twitter. */
async reconnect() {
let initialConnection = true;
if (this.connectionProcessRunning) {
throw new Error('Connection process is already running.');
}

if (this.req) {
initialConnection = false;
this.connectionProcessRunning = true;

if (!this.req.destroyed) {
try {
let initialConnection = true;

if (this.req) {
initialConnection = false;
this.closeWithoutEmit();
}
}

const { req, res } = await new RequestHandlerHelper(this.requestData).makeRequestAndResolveWhenReady();
const { req, res } = await new RequestHandlerHelper(this.requestData).makeRequestAndResolveWhenReady();

this.req = req;
this.res = res;
this.req = req;
this.res = res;

this.emit(initialConnection ? ETwitterStreamEvent.Connected : ETwitterStreamEvent.Reconnected);
this.parser.reset();
this.initEventsFromRequest();
this.emit(initialConnection ? ETwitterStreamEvent.Connected : ETwitterStreamEvent.Reconnected);
this.parser.reset();
this.initEventsFromRequest();
} finally {
this.connectionProcessRunning = false;
}
}

protected async onConnectionError(retryOccurence = 0) {
this.unbindTimeouts();

// Close the request if necessary
if (this.req && !this.req.destroyed) {
this.closeWithoutEmit();
}
this.closeWithoutEmit();

// Terminate stream by events if necessary (no auto-reconnect or retries exceeded)
if (!this.autoReconnect) {
Expand Down

0 comments on commit d73426e

Please sign in to comment.