Skip to content

Commit

Permalink
chore: Rebase onto latest change
Browse files Browse the repository at this point in the history
  • Loading branch information
stocaaro committed Aug 19, 2022
1 parent 68ff8c9 commit 5a7eb71
Show file tree
Hide file tree
Showing 6 changed files with 417 additions and 208 deletions.
187 changes: 137 additions & 50 deletions packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,18 @@ describe('AWSAppSyncRealTimeProvider', () => {
);

let provider: AWSAppSyncRealTimeProvider;
let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>;

beforeEach(async () => {
// Set the network to "online" for these tests
jest
.spyOn(Reachability.prototype, 'networkMonitor')
.mockImplementationOnce(() => {
return new Observable(observer => {
reachabilityObserver = observer;
});
});

fakeWebSocketInterface = new FakeWebSocketInterface();
provider = new AWSAppSyncRealTimeProvider();

Expand All @@ -96,19 +106,10 @@ describe('AWSAppSyncRealTimeProvider', () => {
Object.defineProperty(constants, 'MAX_DELAY_MS', {
value: 100,
});

// Set the network to "online" for these tests
const spyon = jest
.spyOn(Reachability.prototype, 'networkMonitor')
.mockImplementationOnce(
() =>
new Observable(observer => {
observer.next?.({ online: true });
})
);
});

afterEach(async () => {
provider?.close();
await fakeWebSocketInterface?.closeInterface();
fakeWebSocketInterface?.teardown();
loggerSpy.mockClear();
Expand Down Expand Up @@ -383,15 +384,9 @@ describe('AWSAppSyncRealTimeProvider', () => {
error: () => {},
});
await fakeWebSocketInterface?.standardConnectionHandshake();
await fakeWebSocketInterface?.sendMessage(
new MessageEvent('start_ack', {
data: JSON.stringify({
type: MESSAGE_TYPES.GQL_START_ACK,
payload: { connectionTimeoutMs: 100 },
id: fakeWebSocketInterface?.webSocket.subscriptionId,
}),
})
);
await fakeWebSocketInterface?.startAckMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.GQL_DATA,
payload: { data: {} },
Expand All @@ -415,15 +410,9 @@ describe('AWSAppSyncRealTimeProvider', () => {
error: () => {},
});
await fakeWebSocketInterface?.standardConnectionHandshake();
await fakeWebSocketInterface?.sendMessage(
new MessageEvent('start_ack', {
data: JSON.stringify({
type: MESSAGE_TYPES.GQL_START_ACK,
payload: { connectionTimeoutMs: 100 },
id: fakeWebSocketInterface?.webSocket.subscriptionId,
}),
})
);
await fakeWebSocketInterface?.startAckMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.GQL_DATA,
payload: { data: {} },
Expand Down Expand Up @@ -573,29 +562,13 @@ describe('AWSAppSyncRealTimeProvider', () => {
async () => {
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.sendMessage(
new MessageEvent('connection_ack', {
data: JSON.stringify({
type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK,
payload: { connectionTimeoutMs: 100 },
}),
})
);

await fakeWebSocketInterface?.sendMessage(
new MessageEvent('start_ack', {
data: JSON.stringify({
type: MESSAGE_TYPES.GQL_START_ACK,
payload: {},
id: fakeWebSocketInterface?.webSocket.subscriptionId,
}),
})
);

await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE,
payload: { data: {} },
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});

await fakeWebSocketInterface?.startAckMessage();

await fakeWebSocketInterface?.keepAlive();
}
);

Expand All @@ -618,6 +591,120 @@ describe('AWSAppSyncRealTimeProvider', () => {
);
});

test('subscription connection disruption triggers automatic reconnection', async () => {
expect.assertions(1);

const observer = provider.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
});

const subscription = observer.subscribe({ error: () => {} });
// Resolve the message delivery actions

await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.startAckMessage();
await fakeWebSocketInterface.keepAlive();

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);

await fakeWebSocketInterface?.triggerOpen();

await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});
fakeWebSocketInterface?.startAckMessage();
await fakeWebSocketInterface.keepAlive();

// Wait until the socket is automatically reconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([
CS.Disconnected,
CS.Connecting,
CS.Connected,
CS.ConnectionDisrupted,
CS.Connecting,
CS.Connected,
]);
});

test('subscription connection disruption by network outage triggers automatic reconnection once network recovers', async () => {
expect.assertions(1);

const observer = provider.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
});

const subscription = observer.subscribe({ error: () => {} });
// Resolve the message delivery actions

await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});
await fakeWebSocketInterface?.startAckMessage();
await fakeWebSocketInterface.keepAlive();

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

reachabilityObserver?.next?.({ online: false });

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectedPendingNetwork,
]);

fakeWebSocketInterface?.closeInterface();

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisruptedPendingNetwork,
]);

reachabilityObserver?.next?.({ online: true });

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);

await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage();

await fakeWebSocketInterface?.startAckMessage();

// Wait until the socket is automatically reconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([
CS.Disconnected,
CS.Connecting,
CS.Connected,
CS.ConnectedPendingNetwork,
CS.ConnectionDisruptedPendingNetwork,
CS.ConnectionDisrupted,
CS.Connecting,
CS.Connected,
]);
});

test('socket is closed when subscription is closed', async () => {
expect.assertions(1);

Expand Down
Loading

0 comments on commit 5a7eb71

Please sign in to comment.