Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle more scenarios for stream reconnection #429

Merged
merged 6 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions src/managedwriter/stream_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,14 @@ export class StreamConnection extends EventEmitter {
};

private shouldReconnect(err: gax.GoogleError): boolean {
if (
err.code &&
[gax.Status.UNAVAILABLE, gax.Status.RESOURCE_EXHAUSTED].includes(
err.code
) &&
err.message
) {
const detail = err.message.toLowerCase();
const knownErrors = [
'service is currently unavailable', // schema mismatch
'read econnreset', // idle connection reset
'bandwidth exhausted',
'memory limit exceeded',
];
const isKnownError =
knownErrors.findIndex(err => detail.includes(err)) !== -1;
return isKnownError;
}
return false;
const reconnectionErrorCodes = [
gax.Status.UNAVAILABLE,
gax.Status.RESOURCE_EXHAUSTED,
gax.Status.ABORTED,
gax.Status.CANCELLED,
gax.Status.DEADLINE_EXCEEDED,
];
return !!err.code && reconnectionErrorCodes.includes(err.code);
}

private isPermanentError(err: gax.GoogleError): boolean {
Expand Down
6 changes: 3 additions & 3 deletions src/managedwriter/writer_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {StreamConnection} from './stream_connection';

type StreamConnections = {
connectionList: StreamConnection[];
connections: Record<string, StreamConnection>;
};
type CreateWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest;
Expand Down Expand Up @@ -68,7 +67,6 @@ export class WriterClient {
});
this._connections = {
connectionList: [],
connections: {},
};
this._open = false;
}
Expand Down Expand Up @@ -189,7 +187,6 @@ export class WriterClient {
options
);
this._connections.connectionList.push(streamConnection);
this._connections.connections[`${streamId}`] = streamConnection;
return streamConnection;
} catch (err) {
throw new Error('managed stream connection failed:' + err);
Expand Down Expand Up @@ -230,6 +227,9 @@ export class WriterClient {
this._connections.connectionList.map(conn => {
conn.close();
});
this._connections = {
connectionList: [],
};
this._open = false;
}

Expand Down
89 changes: 89 additions & 0 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ import * as assert from 'assert';
import {describe, it, xit} from 'mocha';
import * as uuid from 'uuid';
import * as gax from 'google-gax';
import * as sinon from 'sinon';
import {BigQuery, TableSchema} from '@google-cloud/bigquery';
import * as protos from '../protos/protos';
import * as bigquerywriter from '../src';
import * as protobuf from 'protobufjs';
import {ClientOptions} from 'google-gax';
import * as customerRecordProtoJson from '../samples/customer_record.json';

const sandbox = sinon.createSandbox();
afterEach(() => sandbox.restore());

const {managedwriter, adapt} = bigquerywriter;
const {WriterClient, Writer, JSONWriter, parseStorageErrors} = managedwriter;
const {Type} = protobuf;
Expand Down Expand Up @@ -780,6 +784,91 @@ describe('managedwriter.WriterClient', () => {
}
});

it('should trigger reconnection given some specific errors', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);

const connection = await client.createStreamConnection({
streamType: managedwriter.PendingStream,
destinationTable: parent,
});

let reconnectedCalled = false;
sandbox.stub(connection, 'reconnect').callsFake(() => {
reconnectedCalled = true;
});

const writer = new JSONWriter({
connection,
protoDescriptor,
});

try {
// Write some data and trigger error
const pw = writer.appendRows(
[
{
customer_name: 'Ada Lovelace',
row_num: 1,
},
{
customer_name: 'Alan Turing',
row_num: 2,
},
],
0
);
await pw.getResult();

const reconnectErrorCases: gax.GoogleError[] = [
{
code: gax.Status.ABORTED,
msg: 'Closing the stream because it has been inactive',
},
{
code: gax.Status.RESOURCE_EXHAUSTED,
msg: 'read econnreset',
},
{
code: gax.Status.ABORTED,
msg: 'service is currently unavailable', // schema mismatch
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
},
{
code: gax.Status.RESOURCE_EXHAUSTED,
msg: 'bandwidth exhausted',
},
{
code: gax.Status.RESOURCE_EXHAUSTED,
msg: 'memory limit exceeded',
},
{
code: gax.Status.CANCELLED,
msg: 'any',
},
{
code: gax.Status.DEADLINE_EXCEEDED,
msg: 'a msg',
},
].map(err => {
const gerr = new gax.GoogleError(err.msg);
gerr.code = err.code;
return gerr;
});
for (const gerr of reconnectErrorCases) {
const conn = connection['_connection'] as gax.CancellableStream; // private method
conn.emit('error', gerr);
assert.equal(reconnectedCalled, true);

reconnectedCalled = false; // reset flag
}

writer.close();
} finally {
client.close();
}
});

xit('reconnect on idle connection', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
Expand Down
Loading