Skip to content

Commit

Permalink
fix: handle more scenarios for stream reconnection (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx authored Mar 20, 2024
1 parent cbc7e94 commit e6f9323
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 22 deletions.
27 changes: 8 additions & 19 deletions src/managedwriter/stream_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,14 @@ export class StreamConnection extends EventEmitter {
};

private shouldReconnect(err: gax.GoogleError): boolean {
if (
err.code &&
[gax.Status.UNAVAILABLE, gax.Status.RESOURCE_EXHAUSTED].includes(
err.code
) &&
err.message
) {
const detail = err.message.toLowerCase();
const knownErrors = [
'service is currently unavailable', // schema mismatch
'read econnreset', // idle connection reset
'bandwidth exhausted',
'memory limit exceeded',
];
const isKnownError =
knownErrors.findIndex(err => detail.includes(err)) !== -1;
return isKnownError;
}
return false;
const reconnectionErrorCodes = [
gax.Status.UNAVAILABLE,
gax.Status.RESOURCE_EXHAUSTED,
gax.Status.ABORTED,
gax.Status.CANCELLED,
gax.Status.DEADLINE_EXCEEDED,
];
return !!err.code && reconnectionErrorCodes.includes(err.code);
}

private isPermanentError(err: gax.GoogleError): boolean {
Expand Down
6 changes: 3 additions & 3 deletions src/managedwriter/writer_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {StreamConnection} from './stream_connection';

type StreamConnections = {
connectionList: StreamConnection[];
connections: Record<string, StreamConnection>;
};
type CreateWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest;
Expand Down Expand Up @@ -68,7 +67,6 @@ export class WriterClient {
});
this._connections = {
connectionList: [],
connections: {},
};
this._open = false;
}
Expand Down Expand Up @@ -189,7 +187,6 @@ export class WriterClient {
options
);
this._connections.connectionList.push(streamConnection);
this._connections.connections[`${streamId}`] = streamConnection;
return streamConnection;
} catch (err) {
throw new Error('managed stream connection failed:' + err);
Expand Down Expand Up @@ -230,6 +227,9 @@ export class WriterClient {
this._connections.connectionList.map(conn => {
conn.close();
});
this._connections = {
connectionList: [],
};
this._open = false;
}

Expand Down
89 changes: 89 additions & 0 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import * as assert from 'assert';
import {describe, it, xit} from 'mocha';
import * as uuid from 'uuid';
import * as gax from 'google-gax';
import * as sinon from 'sinon';
import {BigQuery, TableSchema} from '@google-cloud/bigquery';
import * as protos from '../protos/protos';
import * as bigquerywriter from '../src';
Expand All @@ -24,6 +25,9 @@ import {ClientOptions} from 'google-gax';
import * as customerRecordProtoJson from '../samples/customer_record.json';
import {JSONEncoder} from '../src/managedwriter/encoder';

const sandbox = sinon.createSandbox();
afterEach(() => sandbox.restore());

const {managedwriter, adapt} = bigquerywriter;
const {WriterClient, Writer, JSONWriter, parseStorageErrors} = managedwriter;
const {Type} = protobuf;
Expand Down Expand Up @@ -851,6 +855,91 @@ describe('managedwriter.WriterClient', () => {
}
});

it('should trigger reconnection given some specific errors', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);

const connection = await client.createStreamConnection({
streamType: managedwriter.PendingStream,
destinationTable: parent,
});

let reconnectedCalled = false;
sandbox.stub(connection, 'reconnect').callsFake(() => {
reconnectedCalled = true;
});

const writer = new JSONWriter({
connection,
protoDescriptor,
});

try {
// Write some data and trigger error
const pw = writer.appendRows(
[
{
customer_name: 'Ada Lovelace',
row_num: 1,
},
{
customer_name: 'Alan Turing',
row_num: 2,
},
],
0
);
await pw.getResult();

const reconnectErrorCases: gax.GoogleError[] = [
{
code: gax.Status.ABORTED,
msg: 'Closing the stream because it has been inactive',
},
{
code: gax.Status.RESOURCE_EXHAUSTED,
msg: 'read econnreset',
},
{
code: gax.Status.ABORTED,
msg: 'service is currently unavailable',
},
{
code: gax.Status.RESOURCE_EXHAUSTED,
msg: 'bandwidth exhausted',
},
{
code: gax.Status.RESOURCE_EXHAUSTED,
msg: 'memory limit exceeded',
},
{
code: gax.Status.CANCELLED,
msg: 'any',
},
{
code: gax.Status.DEADLINE_EXCEEDED,
msg: 'a msg',
},
].map(err => {
const gerr = new gax.GoogleError(err.msg);
gerr.code = err.code;
return gerr;
});
for (const gerr of reconnectErrorCases) {
const conn = connection['_connection'] as gax.CancellableStream; // private method
conn.emit('error', gerr);
assert.equal(reconnectedCalled, true);

reconnectedCalled = false; // reset flag
}

writer.close();
} finally {
client.close();
}
});

xit('reconnect on idle connection', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
Expand Down

0 comments on commit e6f9323

Please sign in to comment.