Skip to content

Commit

Permalink
Add liveness mechanism in runWatchLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
krapie committed Feb 2, 2025
1 parent 0754863 commit 67aa970
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 83 deletions.
2 changes: 1 addition & 1 deletion packages/sdk/public/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ <h4 class="title">
devtool.setCodeMirror(codemirror);

// 02-1. create client with RPCAddr.
const client = new yorkie.Client('http://localhost:8080');
const client = new yorkie.Client('http://localhost:80');
// 02-2. activate client
await client.activate();

Expand Down
272 changes: 190 additions & 82 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ export interface ClientOptions {
* default value is `1000`(ms).
*/
reconnectStreamDelay?: number;

/**
* `reconnectionStreamTimeout` is the timeout of the reconnection stream. If
* the stream is disconnected, the client waits for the timeout to reconnect
* the stream. The default value is `30000`(ms).
*/
reconnectionStreamTimeout?: number;
}

/**
Expand All @@ -164,6 +171,7 @@ const DefaultClientOptions = {
syncLoopDuration: 50,
retrySyncLoopDelay: 1000,
reconnectStreamDelay: 1000,
reconnectionStreamTimeout: 30000,
};

/**
Expand Down Expand Up @@ -193,6 +201,7 @@ export class Client {
private conditions: Record<ClientCondition, boolean>;
private syncLoopDuration: number;
private reconnectStreamDelay: number;
private reconnectionStreamTimeout: number;
private retrySyncLoopDelay: number;

private rpcClient: PromiseClient<typeof YorkieService>;
Expand Down Expand Up @@ -223,6 +232,9 @@ export class Client {
opts.syncLoopDuration ?? DefaultClientOptions.syncLoopDuration;
this.reconnectStreamDelay =
opts.reconnectStreamDelay ?? DefaultClientOptions.reconnectStreamDelay;
this.reconnectionStreamTimeout =
opts.reconnectionStreamTimeout ??
DefaultClientOptions.reconnectionStreamTimeout;
this.retrySyncLoopDelay =
opts.retrySyncLoopDelay ?? DefaultClientOptions.retrySyncLoopDelay;

Expand Down Expand Up @@ -396,7 +408,10 @@ export class Client {
);

if (syncMode !== SyncMode.Manual) {
await this.runWatchLoop(doc.getKey());
await this.runWatchLoop(
doc.getKey(),
this.reconnectionStreamTimeout,
);
}

logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`);
Expand Down Expand Up @@ -528,7 +543,7 @@ export class Client {

// manual to realtime
if (prevSyncMode === SyncMode.Manual) {
await this.runWatchLoop(doc.getKey());
await this.runWatchLoop(doc.getKey(), this.reconnectionStreamTimeout);
}

return doc;
Expand Down Expand Up @@ -828,7 +843,10 @@ export class Client {
* `runWatchLoop` runs the watch loop for the given document. The watch loop
* listens to the events of the given document from the server.
*/
private async runWatchLoop(docKey: DocKey): Promise<void> {
private async runWatchLoop(
docKey: DocKey,
reconnectionStreamTimeout: number,
): Promise<void> {
const attachment = this.attachmentMap.get(docKey);
if (!attachment) {
throw new YorkieError(
Expand All @@ -838,95 +856,185 @@ export class Client {
}

this.conditions[ClientCondition.WatchLoop] = true;
return attachment.runWatchLoop(
(onDisconnect: () => void): Promise<[WatchStream, AbortController]> => {
if (!this.isActive()) {
this.conditions[ClientCondition.WatchLoop] = false;
return Promise.reject(
new YorkieError(
Code.ErrClientNotActivated,
`${this.key} is not active`,
),
);
}
let lastEventTime: number = Date.now();
let currentTimeout: number = reconnectionStreamTimeout;
let consecutiveTimeouts: number = 0;

const ac = new AbortController();
const stream = this.rpcClient.watchDocument(
{
clientId: this.id!,
documentId: attachment.docID,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${docKey}` },
signal: ac.signal,
},
logger.info(
`[WD] c:"${this.getKey()}" initializing watch loop with base timeout: ${reconnectionStreamTimeout}ms`,
);

const resetEventTimer = () => {
const previousEventTime = lastEventTime;
lastEventTime = Date.now();
logger.debug(
`[WD] c:"${this.getKey()}" event received after ${Date.now() - previousEventTime}ms`,
);

// Reset timeout if we're getting regular events
if (consecutiveTimeouts > 0) {
logger.info(
`[WD] c:"${this.getKey()}" resetting timeout from ${currentTimeout}ms to ${reconnectionStreamTimeout}ms after ${consecutiveTimeouts} timeouts`,
);
consecutiveTimeouts = 0;
currentTimeout = reconnectionStreamTimeout;
}
};

attachment.doc.publish([
{
type: DocEventType.ConnectionChanged,
value: StreamConnectionStatus.Connected,
},
]);
logger.info(`[WD] c:"${this.getKey()}" watches d:"${docKey}"`);

return new Promise((resolve, reject) => {
const handleStream = async () => {
try {
for await (const resp of stream) {
this.handleWatchDocumentsResponse(attachment, resp);

// NOTE(hackerwins): When the first response is received, we need to
// resolve the promise to notify that the watch stream is ready.
if (resp.body.case === 'initialization') {
resolve([stream, ac]);
}
const calculateNextTimeout = () => {
consecutiveTimeouts++;
const previousTimeout = currentTimeout;
// Apply exponential backoff with a maximum of 5 minutes (300000ms)
currentTimeout = Math.min(
reconnectionStreamTimeout * Math.pow(2, consecutiveTimeouts - 1),
300000,
);
logger.info(
`[WD] c:"${this.getKey()}" increasing timeout from ${previousTimeout}ms to ${currentTimeout}ms (consecutive timeouts: ${consecutiveTimeouts})`,
);
return currentTimeout;
};

const attemptConnection = async (
onDisconnect: () => void,
): Promise<[WatchStream, AbortController]> => {
if (!this.isActive()) {
logger.warn(
`[WD] c:"${this.getKey()}" client not active, rejecting connection attempt`,
);
this.conditions[ClientCondition.WatchLoop] = false;
return Promise.reject(
new YorkieError(
Code.ErrClientNotActivated,
`${this.key} is not active`,
),
);
}

logger.debug(
`[WD] c:"${this.getKey()}" attempting connection with timeout ${currentTimeout}ms`,
);
const ac = new AbortController();
const stream = this.rpcClient.watchDocument(
{
clientId: this.id!,
documentId: attachment.docID,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${docKey}` },
signal: ac.signal,
},
);

attachment.doc.publish([
{
type: DocEventType.ConnectionChanged,
value: StreamConnectionStatus.Connected,
},
]);
logger.info(
`[WD] c:"${this.getKey()}" established watch stream for d:"${docKey}" (timeout: ${currentTimeout}ms)`,
);

return new Promise((resolve, reject) => {
// Start event timeout checker
const timeoutChecker = setInterval(() => {
const timeSinceLastEvent = Date.now() - lastEventTime;
logger.debug(
`[WD] c:"${this.getKey()}" time since last event: ${timeSinceLastEvent}ms/${currentTimeout}ms`,
);

if (timeSinceLastEvent >= currentTimeout) {
logger.warn(
`[WD] c:"${this.getKey()}" timeout reached - no events for ${timeSinceLastEvent}ms (threshold: ${currentTimeout}ms)`,
);
clearInterval(timeoutChecker);
ac.abort();
}
}, 1000);

const handleStream = async () => {
try {
logger.debug(`[WD] c:"${this.getKey()}" starting stream handler`);
for await (const resp of stream) {
logger.debug(
`[WD] c:"${this.getKey()}" received ${resp.body.case} event`,
);
this.handleWatchDocumentsResponse(attachment, resp);

if (resp.body.case === 'initialization') {
logger.info(
`[WD] c:"${this.getKey()}" received initialization event`,
);
resolve([stream, ac]);
} else {
resetEventTimer();
}
} catch (err) {
attachment.doc.resetOnlineClients();
attachment.doc.publish([
{
type: DocEventType.Initialized,
source: OpSource.Local,
value: attachment.doc.getPresences(),
},
]);
attachment.doc.publish([
{
type: DocEventType.ConnectionChanged,
value: StreamConnectionStatus.Disconnected,
},
]);
logger.debug(`[WD] c:"${this.getKey()}" unwatches`);

if (await this.handleConnectError(err)) {
if (
err instanceof ConnectError &&
errorCodeOf(err) === Code.ErrUnauthenticated
) {
attachment.doc.publish([
{
type: DocEventType.AuthError,
value: {
reason: errorMetadataOf(err).reason,
method: 'WatchDocuments',
},
}
} catch (err) {
clearInterval(timeoutChecker);
logger.debug(
`[WD] c:"${this.getKey()}" stream error: ${err instanceof Error ? err.message : 'unknown error'}`,
);

attachment.doc.resetOnlineClients();
attachment.doc.publish([
{
type: DocEventType.Initialized,
source: OpSource.Local,
value: attachment.doc.getPresences(),
},
]);
attachment.doc.publish([
{
type: DocEventType.ConnectionChanged,
value: StreamConnectionStatus.Disconnected,
},
]);
logger.debug(`[WD] c:"${this.getKey()}" unwatches after error`);

if (await this.handleConnectError(err)) {
if (
err instanceof ConnectError &&
errorCodeOf(err) === Code.ErrUnauthenticated
) {
logger.error(
`[WD] c:"${this.getKey()}" authentication error, stopping reconnection`,
);
attachment.doc.publish([
{
type: DocEventType.AuthError,
value: {
reason: errorMetadataOf(err).reason,
method: 'WatchDocuments',
},
]);
}
onDisconnect();
} else {
this.conditions[ClientCondition.WatchLoop] = false;
},
]);
reject(err); // Don't retry on auth errors
return;
}

// Calculate next timeout duration with exponential backoff
const nextTimeout = calculateNextTimeout();
logger.info(
`[WD] c:"${this.getKey()}" initiating reconnection with new timeout: ${nextTimeout}ms`,
);
onDisconnect();
} else {
logger.error(
`[WD] c:"${this.getKey()}" unhandled error, stopping watch loop`,
);
this.conditions[ClientCondition.WatchLoop] = false;
reject(err);
}
};
}
};

handleStream();
});
},
);
handleStream();
});
};

return attachment.runWatchLoop(attemptConnection);
}

private handleWatchDocumentsResponse<T, P extends Indexable>(
Expand Down

0 comments on commit 67aa970

Please sign in to comment.