-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(@aws-amplify/datastore): mutation hub event drops during reconnect #11132
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,15 @@ type MutationProcessorEvent = { | |
}; | ||
|
||
class MutationProcessor { | ||
private observer!: ZenObservable.Observer<MutationProcessorEvent>; | ||
/** | ||
* The observer that receives messages when mutations are successfully completed | ||
* against cloud storage. | ||
* | ||
* A value of `undefined` signals that the sync has either been stopped or has not | ||
* yet started. In this case, `isReady()` will be `false` and `resume()` will exit | ||
* early. | ||
*/ | ||
private observer?: ZenObservable.Observer<MutationProcessorEvent>; | ||
private readonly typeQuery = new WeakMap< | ||
SchemaModel, | ||
[TransformerMutationType, string, string][] | ||
|
@@ -130,6 +138,8 @@ class MutationProcessor { | |
} | ||
|
||
return this.runningProcesses.addCleaner(async () => { | ||
// The observer has unsubscribed and/or `stop()` has been called. | ||
this.removeObserver(); | ||
this.pause(); | ||
}); | ||
}); | ||
|
@@ -138,10 +148,16 @@ class MutationProcessor { | |
} | ||
|
||
public async stop() { | ||
this.removeObserver(); | ||
await this.runningProcesses.close(); | ||
await this.runningProcesses.open(); | ||
} | ||
|
||
public removeObserver() { | ||
this.observer?.complete?.(); | ||
this.observer = undefined; | ||
} | ||
|
||
public async resume(): Promise<void> { | ||
await (this.runningProcesses.isOpen && | ||
this.runningProcesses.add(async onTerminate => { | ||
|
@@ -256,7 +272,7 @@ class MutationProcessor { | |
hasMore = (await this.outbox.peek(storage)) !== undefined; | ||
}); | ||
|
||
this.observer.next!({ | ||
this.observer?.next?.({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's entirely possible I'm being dense, but according to the PR description: "An There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wrestled with this little part of the diff for longer than I care to admit, actually. TypeScript isn't convinced that Maybe it makes more sense to revert to bangs to at least get errors logged if someone hits that edge. I think we can shake out more edges after #11131, which will give us a little more control to stress the protocol. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, though we check that the observer isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, there are 100% still some potential edges here. This PR only addresses a semi-related edge wherein reconnect fundamentally pits mutation processing and sync engine restart against each other. I want to be cautious about changing the failure mode at this line. Today, it will be silent if the observer has unsubscribed, AFAIK. This line will happily call I could be wrong. Happy to be shown I'm wrong. I just want to avoid entangling the purpose of this PR with the lesser-known edges here. This is especially true if we don't have any repros or reports of those edges, and when the right solution there is really unclear to me. I dunno. Happy to chat on Monday about this if needed! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Just following up on this, I've tested my recollection on this and it looks correct. Completed and/or unsubscribed observers simply cause There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think TypeScript is just confused that
You could use assertion predicates to give TypeScript enough information to know it would be defined in this code path. But it looks like it isn't well supported on private class properties and would require some hacks. https://www.typescriptlang.org/docs/handbook/release-notes/typescript-3-7.html#assertion-functions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's still a potential issue that could occur if an unsubscribe occurs while a request is in flight. But, it's an existing and less likely race, outside the scope of the more predictable and easy to repro race this PR addresses. In any case, there's a story in the backlog and on the grooming list to revisit this at some point. |
||
operation, | ||
modelDefinition, | ||
model: record, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on also adding tests for
update
anddelete
immediately before reconnect?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I have a compelling reason not to! I'll add them in.