Skip to content

Commit

Permalink
export stremaing module
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Jul 31, 2024
1 parent 205fdd7 commit bc79582
Show file tree
Hide file tree
Showing 4 changed files with 499 additions and 499 deletions.
119 changes: 59 additions & 60 deletions src/frontend/src/controllers/API/api.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -195,82 +195,81 @@ function ApiInterceptor() {
}

export type StreamingRequestParams = {
method: string,
url: string,
onData: (event: object) => Promise<boolean>,
body?: object,
onError?: (statusCode: number) => void,
}
method: string;
url: string;
onData: (event: object) => Promise<boolean>;
body?: object;
onError?: (statusCode: number) => void;
};

async function performStreamingRequest({
method,
url,
onData,
body,
onError}: StreamingRequestParams) {

let headers = {
"Content-Type": "application/json",
// this flag is fundamental to ensure server stops tasks when client disconnects
"Connection": "close"
};
const accessToken = cookies.get(LANGFLOW_ACCESS_TOKEN);
if (accessToken) {
headers["Authorization"] = `Bearer ${accessToken}`;
}
const params = {
method: method,
headers: headers
method,
url,
onData,
body,
onError,
}: StreamingRequestParams) {
let headers = {
"Content-Type": "application/json",
// this flag is fundamental to ensure server stops tasks when client disconnects
Connection: "close",
};
const accessToken = cookies.get(LANGFLOW_ACCESS_TOKEN);
if (accessToken) {
headers["Authorization"] = `Bearer ${accessToken}`;
}
const params = {
method: method,
headers: headers,
};
if (body) {
params["body"] = JSON.stringify(body)
params["body"] = JSON.stringify(body);
}
const response = await fetch(url, params)
const response = await fetch(url, params);
if (!response.ok) {
if (onError) {
onError(response.status)
} else {
throw new Error("error in streaming request")
}
if (onError) {
onError(response.status);
} else {
throw new Error("error in streaming request");
}
}
if (response.body === null) {
return
return;
}
let current: string[] = []
let current: string[] = [];
let textDecoder = new TextDecoder();

for await (const chunk of response.body) {
const decodedChunk = await textDecoder.decode(chunk);
let all = decodedChunk.split("\n\n");
for (const string of all) {
if (string.endsWith("}")) {
const allString = current.join("") + string
let data: object
try {
data = JSON.parse(allString)
current = []
} catch (e) {
current.push(string)
continue
}
const shouldContinue = await onData(data)
if (!shouldContinue) {
current = []
break
}
} else {
current.push(string)
}
const decodedChunk = await textDecoder.decode(chunk);
let all = decodedChunk.split("\n\n");
for (const string of all) {
if (string.endsWith("}")) {
const allString = current.join("") + string;
let data: object;
try {
data = JSON.parse(allString);
current = [];
} catch (e) {
current.push(string);
continue;
}
const shouldContinue = await onData(data);
if (!shouldContinue) {
current = [];
break;
}
} else {
current.push(string);
}
}
}
if (current.length > 0) {
const allString = current.join("")
if (allString) {
const data = JSON.parse(current.join(""))
await onData(data)
}
const allString = current.join("");
if (allString) {
const data = JSON.parse(current.join(""));
await onData(data);
}
}

}

export { ApiInterceptor, api, performStreamingRequest };
2 changes: 1 addition & 1 deletion src/frontend/src/controllers/API/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ export async function getVerticesOrder(
data["data"]["nodes"] = nodes;
data["data"]["edges"] = Edges;
}
return await api.post(
return await api.post(
`${BASE_URL_API}build/${flowId}/vertices`,
data,
config,
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/stores/flowStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ import {
targetHandleType,
} from "../types/flow";
import { FlowStoreType, VertexLayerElementType } from "../types/zustand/flow";
import {buildVertices, buildFlowVertices, buildFlowVerticesWithFallback} from "../utils/buildUtils";
import {
buildFlowVertices,
buildFlowVerticesWithFallback,
buildVertices,
} from "../utils/buildUtils";
import {
checkChatInput,
checkOldComponents,
Expand Down
Loading

0 comments on commit bc79582

Please sign in to comment.