Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(@aws-amplify/datastore): mutation hub event drops during reconnect #11132

Merged
merged 2 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions packages/datastore/__tests__/connectivityHandling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ describe('DataStore sync engine', () => {
} = getDataStore({ online: true, isNode: false });

beforeEach(async () => {
// we don't need to see all the console warnings for these tests ...
(console as any)._warn = console.warn;
console.warn = () => {};

({
DataStore,
schema,
Expand Down Expand Up @@ -77,6 +81,7 @@ describe('DataStore sync engine', () => {

afterEach(async () => {
await DataStore.clear();
console.warn = (console as any)._warn;
});

describe('basic protocol', () => {
Expand Down Expand Up @@ -498,16 +503,7 @@ describe('DataStore sync engine', () => {
expect(cloudAnotherPost.title).toEqual('another title');
});

/**
* Existing bug. (Sort of.)
*
* Outbox mutations are processed, but the hub events are not sent, so
* the test hangs and times out. :shrug:
*
* It is notable that the data is correct in this case, we just don't
* receive all of the expected Hub events.
*/
test.skip('survives online -> offline -> save/online race', async () => {
test('survives online -> offline -> save/online race', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on also adding tests for update and delete immediately before reconnect?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I have a compelling reason not to! I'll add them in.

const post = await DataStore.save(
new Post({
title: 'a title',
Expand Down Expand Up @@ -542,6 +538,65 @@ describe('DataStore sync engine', () => {
) as any;
expect(cloudAnotherPost.title).toEqual('another title');
});

test('survives online -> offline -> update/online race', async () => {
const post = await DataStore.save(
new Post({
title: 'a title',
})
);

await waitForEmptyOutbox();
await simulateDisconnect();

const outboxEmpty = waitForEmptyOutbox();

const retrieved = await DataStore.query(Post, post.id);
await DataStore.save(
Post.copyOf(retrieved!, updated => (updated.title = 'new title'))
);

// NO PAUSE: Simulate reconnect IMMEDIATELY, causing a race
// between the save and the sync engine reconnection operations.

await simulateConnect();
await outboxEmpty;

const table = graphqlService.tables.get('Post')!;
expect(table.size).toEqual(1);

const cloudPost = table.get(JSON.stringify([post.id])) as any;
expect(cloudPost.title).toEqual('new title');
});

test('survives online -> offline -> delete/online race', async () => {
const post = await DataStore.save(
new Post({
title: 'a title',
})
);

await waitForEmptyOutbox();
await simulateDisconnect();

const outboxEmpty = waitForEmptyOutbox();

const retrieved = await DataStore.query(Post, post.id);
await DataStore.delete(retrieved!);

// NO PAUSE: Simulate reconnect IMMEDIATELY, causing a race
// between the save and the sync engine reconnection operations.

await simulateConnect();
await outboxEmpty;

const table = graphqlService.tables.get('Post')!;
expect(table.size).toEqual(1);

const cloudPost = table.get(JSON.stringify([post.id])) as any;
expect(cloudPost.title).toEqual('a title');
expect(cloudPost._deleted).toEqual(true);
});
});

describe('selective sync', () => {
Expand Down
1 change: 1 addition & 0 deletions packages/datastore/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ export class SyncEngine {

await startPromise;

// Set by the this.datastoreConnectivity.status().subscribe() loop
if (this.online) {
this.mutationsProcessor.resume();
}
Expand Down
20 changes: 18 additions & 2 deletions packages/datastore/src/sync/processors/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ type MutationProcessorEvent = {
};

class MutationProcessor {
private observer!: ZenObservable.Observer<MutationProcessorEvent>;
/**
* The observer that receives messages when mutations are successfully completed
* against cloud storage.
*
* A value of `undefined` signals that the sync has either been stopped or has not
* yet started. In this case, `isReady()` will be `false` and `resume()` will exit
* early.
*/
private observer?: ZenObservable.Observer<MutationProcessorEvent>;
private readonly typeQuery = new WeakMap<
SchemaModel,
[TransformerMutationType, string, string][]
Expand Down Expand Up @@ -130,6 +138,8 @@ class MutationProcessor {
}

return this.runningProcesses.addCleaner(async () => {
// The observer has unsubscribed and/or `stop()` has been called.
this.removeObserver();
this.pause();
});
});
Expand All @@ -138,10 +148,16 @@ class MutationProcessor {
}

public async stop() {
this.removeObserver();
await this.runningProcesses.close();
await this.runningProcesses.open();
}

public removeObserver() {
this.observer?.complete?.();
this.observer = undefined;
}

public async resume(): Promise<void> {
await (this.runningProcesses.isOpen &&
this.runningProcesses.add(async onTerminate => {
Expand Down Expand Up @@ -256,7 +272,7 @@ class MutationProcessor {
hasMore = (await this.outbox.peek(storage)) !== undefined;
});

this.observer.next!({
this.observer?.next?.({
Copy link
Contributor

@david-mcafee david-mcafee Mar 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's entirely possible I'm being dense, but according to the PR description: "An undefined observer signals to the mutation processor not to immediately resume on mutation, but instead wait for an observer (the sync engine) to register itself and re-start() the mutation processor." Wouldn't this mean that resume doesn't happen if the observer is undefined? And if so, why is it possible that the observer is possibly undefined, here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrestled with this little part of the diff for longer than I care to admit, actually. TypeScript isn't convinced that this.observer always exists here, or that .next exists on the observer. So, with that said, I'm actually not 100% sure PR eliminates all cases where a mutation event goes missing. It does provide an incremental improvement to one edge I identified while trying to add tests.

Maybe it makes more sense to revert to bangs to at least get errors logged if someone hits that edge. I think we can shake out more edges after #11131, which will give us a little more control to stress the protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, though we check that the observer isn't undefined at the beginning of resume (i.e. the status of isReady), it's possible that between there and here, the observer can now become undefined. I think I'm just most concerned about silent failures, and think that error logs would be better. Thoughts?

Copy link
Member Author

@svidgen svidgen Mar 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there are 100% still some potential edges here. This PR only addresses a semi-related edge wherein reconnect fundamentally pits mutation processing and sync engine restart against each other. I want to be cautious about changing the failure mode at this line. Today, it will be silent if the observer has unsubscribed, AFAIK. This line will happily call .next() on a closed subscriber, if I'm not mistaken and silently deliver that message into the ether.

I could be wrong. Happy to be shown I'm wrong. I just want to avoid entangling the purpose of this PR with the lesser-known edges here. This is especially true if we don't have any repros or reports of those edges, and when the right solution there is really unclear to me.

I dunno. Happy to chat on Monday about this if needed!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will happily call .next() on a closed subscriber, if I'm not mistaken and silently deliver that message into the ether.

Just following up on this, I've tested my recollection on this and it looks correct. Completed and/or unsubscribed observers simply cause next() to be a no-op. No error message will be logged. It will just a silent failure. I'll create a backlog item for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TypeScript is just confused that this.isReady actually asserts that this.observer is defined. Even if you place this.isReady directly before this.observer.next you will still get the same error.

if (this.isReady()) {
  // still thinks that this.observer could be undefined
  this.observer.next ...

You could use assertion predicates to give TypeScript enough information to know it would be defined in this code path. But it looks like it isn't well supported on private class properties and would require some hacks.

https://www.typescriptlang.org/docs/handbook/release-notes/typescript-3-7.html#assertion-functions

https://stackoverflow.com/questions/74264474/in-typescript-how-do-i-assert-the-value-of-a-class-variable-using-a-method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's still a potential issue that could occur if an unsubscribe occurs while a request is in flight. But, it's an existing and less likely race, outside the scope of the more predictable and easy to repro race this PR addresses.

In any case, there's a story in the backlog and on the grooming list to revisit this at some point.

operation,
modelDefinition,
model: record,
Expand Down