Skip to content

Commit

Permalink
fix changefeed diagnostics (#27091)
Browse files Browse the repository at this point in the history
### Packages impacted by this PR
@azure/cosmos

### Issues associated with this PR
N/A

### Describe the problem that is addressed by this PR

- Removes check for empty 200 response from backend in async iterator
which earlier led to empty diagnostics
- Add diagnostics to read container in changefeed

### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?


### Are there test cases added in this PR? _(If not, why?)_


### Provide a list of related PRs _(if any)_


### Command used to generate this PR:**_(Applicable only to SDK release
request PRs)_

### Checklists
- [ ] Added impacted package name to the issue description
- [ ] Does this PR needs any fixes in the SDK Generator?** _(If so,
create an Issue in the
[Autorest/typescript](https://github.com/Azure/autorest.typescript)
repository and link it here)_
- [ ] Added a changelog (if necessary)
  • Loading branch information
amanrao23 authored Sep 12, 2023
1 parent 300b9bb commit 4b34500
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 24 deletions.
14 changes: 4 additions & 10 deletions sdk/cosmosdb/cosmos/src/client/ChangeFeed/ChangeFeedForEpkRange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
this.isInstantiated = false;
}

private async setIteratorRid(): Promise<void> {
const { resource } = await this.container.read();
private async setIteratorRid(diagnosticNode: DiagnosticNodeInternal): Promise<void> {
const { resource } = await this.container.readInternal(diagnosticNode);
this.rId = resource._rid;
}

Expand Down Expand Up @@ -156,13 +156,7 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
public async *getAsyncIterator(): AsyncIterable<ChangeFeedIteratorResponse<Array<T & Resource>>> {
do {
const result = await this.readNext();
// filter out some empty 200 responses from backend.
if (
(result.count === 0 && result.statusCode === StatusCodes.NotModified) ||
(result.count > 0 && result.statusCode === StatusCodes.Ok)
) {
yield result;
}
yield result;
} while (this.hasMoreResults);
}

Expand All @@ -177,7 +171,7 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
return withDiagnostics(async (diagnosticNode: DiagnosticNodeInternal) => {
// validate if the internal queue is filled up with feed ranges.
if (!this.isInstantiated) {
await this.setIteratorRid();
await this.setIteratorRid(diagnosticNode);
await this.fillChangeFeedQueue(diagnosticNode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { InternalChangeFeedIteratorOptions } from "./InternalChangeFeedOptions";
import { ChangeFeedIteratorResponse } from "./ChangeFeedIteratorResponse";
import { Container, Resource } from "../../client";
import { ClientContext } from "../../ClientContext";
import { Constants, ResourceType, StatusCodes } from "../../common";
import { Constants, ResourceType } from "../../common";
import { FeedOptions, Response, ErrorResponse } from "../../request";
import { ContinuationTokenForPartitionKey } from "./ContinuationTokenForPartitionKey";
import { ChangeFeedPullModelIterator } from "./ChangeFeedPullModelIterator";
Expand Down Expand Up @@ -43,8 +43,8 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
}
}

private async instantiateIterator(): Promise<void> {
await this.setIteratorRid();
private async instantiateIterator(diagnosticNode: DiagnosticNodeInternal): Promise<void> {
await this.setIteratorRid(diagnosticNode);
if (this.continuationToken) {
if (!this.continuationTokenRidMatchContainerRid()) {
throw new ErrorResponse("The continuation is not for the current container definition.");
Expand All @@ -67,8 +67,8 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
return true;
}

private async setIteratorRid(): Promise<void> {
const { resource } = await this.container.read();
private async setIteratorRid(diagnosticNode: DiagnosticNodeInternal): Promise<void> {
const { resource } = await this.container.readInternal(diagnosticNode);
this.rId = resource._rid;
}

Expand All @@ -85,13 +85,7 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
public async *getAsyncIterator(): AsyncIterable<ChangeFeedIteratorResponse<Array<T & Resource>>> {
do {
const result = await this.readNext();
// filter out some empty 200 responses from backend.
if (
(result.count === 0 && result.statusCode === StatusCodes.NotModified) ||
(result.count > 0 && result.statusCode === StatusCodes.Ok)
) {
yield result;
}
yield result;
} while (this.hasMoreResults);
}

Expand All @@ -101,7 +95,7 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
public async readNext(): Promise<ChangeFeedIteratorResponse<Array<T & Resource>>> {
return withDiagnostics(async (diagnosticNode: DiagnosticNodeInternal) => {
if (!this.isInstantiated) {
await this.instantiateIterator();
await this.instantiateIterator(diagnosticNode);
}
const result = await this.fetchNext(diagnosticNode);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ describe("Change Feed Iterator", function (this: Suite) {
},
{
retryCount: 0,
metadataCallCount: 2,
metadataCallCount: 4,
locationEndpointsContacted: 1,
requestStartTimeUTCInMsLowerLimit: startTimestamp,
requestDurationInMsUpperLimit: getCurrentTimestampInMs() - startTimestamp,
Expand Down

0 comments on commit 4b34500

Please sign in to comment.