Skip to content

Commit

Permalink
fix: Reset the retry counter to 0 when receiving data (#1604)
Browse files Browse the repository at this point in the history
* Commented out for testing

* Add a bunch of logs

* more logs

* Save the logs on a branch

* Add mechanism for setting retries to 0

* Logging for unique bug

* First part of the cleanup

* Remove more console logs

* Remove the testExample test

* Remove variable

* Return package.json back to the way it was

* Add comment

* Eliminate console logs and comments

* Change the test to expect something more specific

* Adjust test with one little change

* Get rid of the commented code

* Remove space

* Remove indent

* Preserve existing behaviour for data events

* Update gax/test/test-application/src/index.ts

Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>

---------

Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>
  • Loading branch information
danieljbruce and leahecole authored May 27, 2024
1 parent 097516a commit 13b5d23
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 2 deletions.
7 changes: 6 additions & 1 deletion gax/src/streamingCalls/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
*/
streamHandoffHelper(stream: CancellableStream, retry: RetryOptions): void {
let enteredError = false;
const eventsToForward = ['metadata', 'response', 'status', 'data'];
const eventsToForward = ['metadata', 'response', 'status'];

eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
Expand All @@ -282,6 +282,11 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
this.streamHandoffErrorHandler(stream, retry, error);
});

stream.on('data', (data: ResponseType) => {
this.retries = 0;
this.emit.bind(this, 'data')(data);
});

stream.on('end', () => {
if (!enteredError) {
enteredError = true;
Expand Down
79 changes: 79 additions & 0 deletions gax/test/test-application/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ async function testShowcase() {
const restClient = new EchoClient(restClientOpts);
const restClientCompat = new EchoClient(restClientOptsCompat);

await testResetRetriesToZero(grpcSequenceClientWithServerStreamingRetries);

// assuming gRPC server is started locally
await testEcho(grpcClient);
await testEchoError(grpcClient);
Expand Down Expand Up @@ -714,6 +716,83 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
});
}

// When the stream recieves data then the retry count should be set to 0
async function testResetRetriesToZero(client: SequenceServiceClient) {
const finalData: string[] = [];
const shouldRetryFn = (error: GoogleError) => {
return [4, 5, 6, 7].includes(error!.code!);
};
const backoffSettings = createBackoffSettings(
10000,
2.5,
1000,
null,
1.5,
3000,
null
);
// intentionally set maxRetries to a value less than
// the number of errors in the sequence
backoffSettings.maxRetries = 2;
const getResumptionRequestFn = (request: RequestType) => {
return request;
};

const retryOptions = new RetryOptions(
[],
backoffSettings,
shouldRetryFn,
getResumptionRequestFn
);

const settings = {
retry: retryOptions,
};

client.initialize();

const request = createStreamingSequenceRequestFactory(
[
Status.DEADLINE_EXCEEDED,
Status.NOT_FOUND,
Status.ALREADY_EXISTS,
Status.PERMISSION_DENIED,
Status.OK,
],
[0.1, 0.1, 0.1, 0.1, 0.1],
[1, 2, 3, 4, 5],
'This is testing the brand new and shiny StreamingSequence server 3'
);
const response = await client.createStreamingSequence(request);
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
attemptRequest.name = sequence.name!;

const attemptStream = client.attemptStreamingSequence(
attemptRequest,
settings
);
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();
resolve();
});
}).then(() => {
assert.deepStrictEqual(
finalData.join(' '),
'This This is This is testing This is testing the This is testing the brand'
);
});
}

// When maxRetries are set to 2 then on the third error from the server gax
// should throw an error that says the retry count has been exceeded.
async function testShouldFailOnThirdError(client: SequenceServiceClient) {
Expand Down
1 change: 0 additions & 1 deletion gax/test/unit/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,6 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
const s = new PassThrough({
objectMode: true,
});
s.push('hello');
setImmediate(() => {
s.emit('metadata');
});
Expand Down

0 comments on commit 13b5d23

Please sign in to comment.