Skip to content

Commit

Permalink
fix(platform/builder): Add heartbeat mechanism (#8665)
Browse files Browse the repository at this point in the history
* add heartbeat mechanism

* formatting data

* import List

* another import fix

* wip

* formatting adn linting
  • Loading branch information
aarushik93 authored Nov 18, 2024
1 parent 1f34f78 commit 8fccf2e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
11 changes: 6 additions & 5 deletions autogpt_platform/backend/backend/server/model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import enum
import typing
from typing import Any, List, Optional, Union

import pydantic

Expand All @@ -12,11 +12,12 @@ class Methods(enum.Enum):
UNSUBSCRIBE = "unsubscribe"
EXECUTION_EVENT = "execution_event"
ERROR = "error"
HEARTBEAT = "heartbeat"


class WsMessage(pydantic.BaseModel):
method: Methods
data: typing.Dict[str, typing.Any] | list[typing.Any] | None = None
data: Optional[Union[dict[str, Any], list[Any], str]] = None
success: bool | None = None
channel: str | None = None
error: str | None = None
Expand All @@ -40,8 +41,8 @@ class CreateGraph(pydantic.BaseModel):

class CreateAPIKeyRequest(pydantic.BaseModel):
name: str
permissions: typing.List[APIKeyPermission]
description: typing.Optional[str] = None
permissions: List[APIKeyPermission]
description: Optional[str] = None


class CreateAPIKeyResponse(pydantic.BaseModel):
Expand All @@ -54,4 +55,4 @@ class SetGraphActiveVersion(pydantic.BaseModel):


class UpdatePermissionsRequest(pydantic.BaseModel):
permissions: typing.List[APIKeyPermission]
permissions: List[APIKeyPermission]
7 changes: 7 additions & 0 deletions autogpt_platform/backend/backend/server/ws_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ async def websocket_router(
while True:
data = await websocket.receive_text()
message = WsMessage.model_validate_json(data)

if message.method == Methods.HEARTBEAT:
await websocket.send_json(
{"method": Methods.HEARTBEAT.value, "data": "pong", "success": True}
)
continue

if message.method == Methods.SUBSCRIBE:
await handle_subscribe(websocket, manager, message)

Expand Down
64 changes: 60 additions & 4 deletions autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ export default class BaseAutoGPTServerAPI {
private wsConnecting: Promise<void> | null = null;
private wsMessageHandlers: Record<string, Set<(data: any) => void>> = {};
private supabaseClient: SupabaseClient | null = null;
heartbeatInterval: number | null = null;
readonly HEARTBEAT_INTERVAL = 30000; // 30 seconds
readonly HEARTBEAT_TIMEOUT = 10000; // 10 seconds
heartbeatTimeoutId: number | null = null;

constructor(
baseUrl: string = process.env.NEXT_PUBLIC_AGPT_SERVER_URL ||
Expand Down Expand Up @@ -324,34 +328,84 @@ export default class BaseAutoGPTServerAPI {
}
}

startHeartbeat() {
this.stopHeartbeat();
this.heartbeatInterval = window.setInterval(() => {
if (this.webSocket?.readyState === WebSocket.OPEN) {
this.webSocket.send(
JSON.stringify({
method: "heartbeat",
data: "ping",
success: true,
}),
);

this.heartbeatTimeoutId = window.setTimeout(() => {
console.log("Heartbeat timeout - reconnecting");
this.webSocket?.close();
this.connectWebSocket();
}, this.HEARTBEAT_TIMEOUT);
}
}, this.HEARTBEAT_INTERVAL);
}

stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.heartbeatTimeoutId) {
clearTimeout(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = null;
}
}

handleHeartbeatResponse() {
if (this.heartbeatTimeoutId) {
clearTimeout(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = null;
}
}

async connectWebSocket(): Promise<void> {
this.wsConnecting ??= new Promise(async (resolve, reject) => {
try {
const token =
(await this.supabaseClient?.auth.getSession())?.data.session
?.access_token || "";

const wsUrlWithToken = `${this.wsUrl}?token=${token}`;
this.webSocket = new WebSocket(wsUrlWithToken);

this.webSocket.onopen = () => {
console.debug("WebSocket connection established");
console.log("WebSocket connection established");
this.startHeartbeat(); // Start heartbeat when connection opens
resolve();
};

this.webSocket.onclose = (event) => {
console.debug("WebSocket connection closed", event);
console.log("WebSocket connection closed", event);
this.stopHeartbeat(); // Stop heartbeat when connection closes
this.webSocket = null;
// Attempt to reconnect after a delay
setTimeout(() => this.connectWebSocket(), 1000);
};

this.webSocket.onerror = (error) => {
console.error("WebSocket error:", error);
this.stopHeartbeat(); // Stop heartbeat on error
reject(error);
};

this.webSocket.onmessage = (event) => {
const message: WebsocketMessage = JSON.parse(event.data);
if (message.method == "execution_event") {

// Handle heartbeat response
if (message.method === "heartbeat" && message.data === "pong") {
this.handleHeartbeatResponse();
return;
}

if (message.method === "execution_event") {
message.data = parseNodeExecutionResultTimestamps(message.data);
}
this.wsMessageHandlers[message.method]?.forEach((handler) =>
Expand All @@ -367,6 +421,7 @@ export default class BaseAutoGPTServerAPI {
}

disconnectWebSocket() {
this.stopHeartbeat(); // Stop heartbeat when disconnecting
if (this.webSocket && this.webSocket.readyState === WebSocket.OPEN) {
this.webSocket.close();
}
Expand Down Expand Up @@ -423,6 +478,7 @@ type GraphCreateRequestBody =
type WebsocketMessageTypeMap = {
subscribe: { graph_id: string };
execution_event: NodeExecutionResult;
heartbeat: "ping" | "pong";
};

type WebsocketMessage = {
Expand Down

0 comments on commit 8fccf2e

Please sign in to comment.