Skip to content

Commit

Permalink
Add retry logic and test for watch document
Browse files Browse the repository at this point in the history
  • Loading branch information
chacha912 committed Oct 23, 2024
1 parent c699217 commit 0b3d3a9
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
11 changes: 11 additions & 0 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ export class Client {
}

this.conditions[ClientCondition.WatchLoop] = true;
let retryCount = 0;
return attachment.runWatchLoop(
(onDisconnect: () => void): Promise<[WatchStream, AbortController]> => {
if (!this.isActive()) {
Expand Down Expand Up @@ -948,6 +949,15 @@ export class Client {
err instanceof ConnectError &&
errorCodeOf(err) === Code.ErrUnauthenticated
) {
if (retryCount >= this.maxRequestRetries) {
logger.error(
`[WD] c:"${this.getKey()}" max retries (${
this.maxRequestRetries
}) exceeded`,
);
reject(err);
return;
}
attachment.doc.publish([
{
type: DocEventType.AuthError,
Expand All @@ -959,6 +969,7 @@ export class Client {
]);
}
onDisconnect();
retryCount++;
} else {
this.conditions[ClientCondition.WatchLoop] = false;
}
Expand Down
97 changes: 97 additions & 0 deletions packages/sdk/test/integration/webhook_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { describe, it, vi, beforeAll, afterAll, expect } from 'vitest';
import yorkie, {
SyncMode,
DocumentSyncStatus,
DocEventType,
} from '@yorkie-js-sdk/src/yorkie';
import {
assertThrowsAsync,
Expand Down Expand Up @@ -353,4 +354,100 @@ describe('Auth Webhook', () => {
await client.detach(doc);
await client.deactivate();
});

it('should refresh token and retry watch document', async ({ task }) => {
// Create New project
const projectResponse = await axios.post(
`${testRPCAddr}/yorkie.v1.AdminService/CreateProject`,
{ name: `auth-webhook-${new Date().getTime()}` },
{
headers: { Authorization: adminToken },
},
);
const projectId = projectResponse.data.project.id;
apiKey = projectResponse.data.project.publicKey;

// Update project with webhook url and methods
await axios.post(
`${testRPCAddr}/yorkie.v1.AdminService/UpdateProject`,
{
id: projectId,
fields: {
auth_webhook_url: `http://127.0.0.1:${webhookServerPort}/auth-webhook`,
auth_webhook_methods: { methods: ['WatchDocuments'] },
},
},
{
headers: { Authorization: adminToken },
},
);

const TokenExpirationMs = 2000;
const authTokenInjector = vi.fn(async (authErrorMessage) => {
if (authErrorMessage === ExpiredTokenErrorMessage) {
return `token-${Date.now() + TokenExpirationMs}`;
}
return `token-${Date.now()}`;
});
// client with token
const client = new yorkie.Client(testRPCAddr, {
apiKey,
authTokenInjector,
retryRequestDelay: 0,
});

await client.activate();
const docKey = toDocKey(`${task.name}-${new Date().getTime()}`);
const doc = new yorkie.Document<{ k1: string }>(docKey);

const authErrorEventCollector = new EventCollector<{
errorMessage: string;
method: string;
}>();
doc.subscribe('auth-error', (event) => {
authErrorEventCollector.add(event.value);
});

// Another client for verifying if the watchDocument is working properly
const client2 = new yorkie.Client(testRPCAddr, {
apiKey,
authTokenInjector: async () => {
return `token-${Date.now() + 1000 * 60 * 60}`; // expire in 1 hour
},
retryRequestDelay: 0,
});
await client2.activate();
const doc2 = new yorkie.Document<{ k1: string }>(docKey);
await client2.attach(doc2);

const presenceEventCollector = new EventCollector();
doc2.subscribe('presence', (event) => {
presenceEventCollector.add(event.type);
});

// retry watch document
await new Promise((res) => setTimeout(res, TokenExpirationMs));
await client.attach(doc);
await authErrorEventCollector.waitFor({
errorMessage: ExpiredTokenErrorMessage,
method: 'WatchDocuments',
});
expect(authTokenInjector).toBeCalledTimes(2);
expect(authTokenInjector).nthCalledWith(1);
expect(authTokenInjector).nthCalledWith(2, ExpiredTokenErrorMessage);
await presenceEventCollector.waitFor(DocEventType.Watched);

const syncEventCollector = new EventCollector();
doc.subscribe('sync', (event) => {
syncEventCollector.add(event.value);
});
doc2.update((root) => {
root.k1 = 'v1';
});
await syncEventCollector.waitFor(DocumentSyncStatus.Synced);
expect(doc.getRoot().k1).toBe('v1');

await client.detach(doc);
await client.deactivate();
});
});

0 comments on commit 0b3d3a9

Please sign in to comment.