Skip to content

Commit

Permalink
export stremaing module
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Jul 31, 2024
1 parent 3320368 commit 205fdd7
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 51 deletions.
4 changes: 2 additions & 2 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ def on_disconnect():
logger.debug("Client disconnected, closing tasks")
main_task.cancel()

return DisconnectHandlerStreamingResponse(consume_and_yield(asyncio_queue, main_task),
media_type="text/event-stream",
return DisconnectHandlerStreamingResponse(consume_and_yield(asyncio_queue),
media_type="application/x-ndjson",
on_disconnect=on_disconnect)

class DisconnectHandlerStreamingResponse(StreamingResponse):
Expand Down
84 changes: 82 additions & 2 deletions src/frontend/src/controllers/API/api.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ const api: AxiosInstance = axios.create({
baseURL: "",
});

const cookies = new Cookies();
function ApiInterceptor() {
const autoLogin = useAuthStore((state) => state.autoLogin);
const setErrorData = useAlertStore((state) => state.setErrorData);
let { accessToken, authenticationErrorCount } = useContext(AuthContext);
const cookies = new Cookies();

const setSaveLoading = useFlowsManagerStore((state) => state.setSaveLoading);
const { mutate: mutationLogout } = useLogout();
const { mutate: mutationRenewAccessToken } = useRefreshAccessToken();
Expand Down Expand Up @@ -193,4 +194,83 @@ function ApiInterceptor() {
return null;
}

export { ApiInterceptor, api };
export type StreamingRequestParams = {
method: string,
url: string,
onData: (event: object) => Promise<boolean>,
body?: object,
onError?: (statusCode: number) => void,
}

async function performStreamingRequest({
method,
url,
onData,
body,
onError}: StreamingRequestParams) {

let headers = {
"Content-Type": "application/json",
// this flag is fundamental to ensure server stops tasks when client disconnects
"Connection": "close"
};
const accessToken = cookies.get(LANGFLOW_ACCESS_TOKEN);
if (accessToken) {
headers["Authorization"] = `Bearer ${accessToken}`;
}
const params = {
method: method,
headers: headers
}
if (body) {
params["body"] = JSON.stringify(body)
}
const response = await fetch(url, params)
if (!response.ok) {
if (onError) {
onError(response.status)
} else {
throw new Error("error in streaming request")
}
}
if (response.body === null) {
return
}
let current: string[] = []
let textDecoder = new TextDecoder();

for await (const chunk of response.body) {
const decodedChunk = await textDecoder.decode(chunk);
let all = decodedChunk.split("\n\n");
for (const string of all) {
if (string.endsWith("}")) {
const allString = current.join("") + string
let data: object
try {
data = JSON.parse(allString)
current = []
} catch (e) {
current.push(string)
continue
}
const shouldContinue = await onData(data)
if (!shouldContinue) {
current = []
break
}
} else {
current.push(string)
}
}
}
if (current.length > 0) {
const allString = current.join("")
if (allString) {
const data = JSON.parse(current.join(""))
await onData(data)
}
}

}

export { ApiInterceptor, api, performStreamingRequest };
67 changes: 20 additions & 47 deletions src/frontend/src/utils/buildUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {VertexBuildTypeAPI} from "../types/api";
import {isErrorLogType} from "../types/utils/typeCheckingUtils";
import {VertexLayerElementType} from "../types/zustand/flow";
import {BASE_URL_API} from "@/constants/constants";
import {performStreamingRequest} from "@/controllers/API/api";

type BuildVerticesParams = {
setLockChat?: (lock: boolean) => void;
Expand Down Expand Up @@ -160,59 +161,15 @@ export async function buildFlowVertices({
if (files) {
postData["files"] = files;
}
async function startServerSideBuild(onEvent: (type: string, data: any) => Promise<boolean>) {

const response = await fetch(url, {
method: "POST",
body: JSON.stringify(postData),
headers: {
"Content-Type": "application/json",
"Connection": "close"
}})
if (response.status == 404) {
throw new Error("endpoint not available")
}
let current: string[] = []
let textDecoder = new TextDecoder();
for await (const chunk of response.body) {
const data = await textDecoder.decode(chunk);
let all = data.split("\n\n");
for (const string of all) {
if (string.endsWith("}")) {
const allString = current.join("") + string
let event
try {
event = JSON.parse(allString)
current = []
} catch (e) {
current.push(string)
continue
}
const shouldContinue = await onEvent(event.event, event.data)
if (!shouldContinue) {
current = []
break
}
} else {
current.push(string)
}
}
}
if (current.length > 0) {
const allString = current.join("")
if (allString) {
const event = JSON.parse(current.join(""))
await onEvent(event.event, event.data)
}
}
}



const buildResults: Array<boolean> = [];

const verticesStartTimeMs: Map<string, number> = new Map();

return startServerSideBuild(async (type, data): Promise<boolean> => {
const onEvent = async (type, data): Promise<boolean> => {
const onStartVertex = (id: string) => {
verticesStartTimeMs.set(id, Date.now());
useFlowStore.getState().updateBuildStatus([id], BuildStatus.TO_BUILD);
Expand Down Expand Up @@ -302,7 +259,23 @@ export async function buildFlowVertices({
}
}
return true
});
};
return performStreamingRequest({
method: "POST",
url,
body: postData,
onData: async (event) => {
const type = event["event"];
const data = event["data"];
return await onEvent(type, data);
},
onError: (statusCode) => {
if (statusCode === 404) {
throw new Error("endpoint not available")
}
throw new Error("error in streaming request")
}
})
}

export async function buildVertices({
Expand Down

0 comments on commit 205fdd7

Please sign in to comment.