From 388bf7ad484845e0ea609c67162adb13cf3e30ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 29 Jul 2024 13:51:02 +0200 Subject: [PATCH 1/9] feat: ui build in one single http request --- src/backend/base/langflow/api/utils.py | 7 +- src/backend/base/langflow/api/v1/chat.py | 301 +++++++++++++++++- src/backend/base/langflow/api/v1/schemas.py | 1 + src/backend/base/langflow/graph/graph/base.py | 10 +- src/frontend/src/controllers/API/api.tsx | 84 ++++- .../components/chatView/chatMessage/index.tsx | 1 - src/frontend/src/stores/flowStore.ts | 22 +- src/frontend/src/types/zustand/flow/index.ts | 4 +- src/frontend/src/utils/buildUtils.ts | 175 +++++++++- 9 files changed, 574 insertions(+), 31 deletions(-) diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index 55b0918b8d8c..d3204504b99a 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -122,7 +122,7 @@ def format_elapsed_time(elapsed_time: float) -> str: return f"{minutes} {minutes_unit}, {seconds} {seconds_unit}" -async def build_graph_from_db(flow_id: str, session: Session, chat_service: "ChatService"): +async def build_graph_from_db_no_cache(flow_id: str, session: Session): """Build and cache the graph.""" flow: Optional[Flow] = session.get(Flow, flow_id) if not flow or not flow.data: @@ -139,6 +139,11 @@ async def build_graph_from_db(flow_id: str, session: Session, chat_service: "Cha graph.set_run_id(run_id) graph.set_run_name() await graph.initialize_run() + + return graph + +async def build_graph_from_db(flow_id: str, session: Session, chat_service: "ChatService"): + graph = await build_graph_from_db_no_cache(flow_id, session) await chat_service.set_cache(flow_id, graph) return graph diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 8121e8889295..7fe543dda151 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -1,11 +1,18 @@ +import asyncio +import json import time import traceback +import typing import uuid +from asyncio import QueueEmpty from typing import TYPE_CHECKING, Annotated, Optional -from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request from fastapi.responses import StreamingResponse from loguru import logger +from starlette.background import BackgroundTask +from starlette.responses import ContentStream +from starlette.types import Receive from langflow.api.utils import ( build_and_cache_graph_from_data, @@ -13,7 +20,7 @@ format_elapsed_time, format_exception_message, get_top_level_vertices, - parse_exception, + parse_exception, build_graph_from_db_no_cache, ) from langflow.api.v1.schemas import ( FlowDataRequest, @@ -21,7 +28,7 @@ ResultDataResponse, StreamData, VertexBuildResponse, - VerticesOrderResponse, + VerticesOrderResponse ) from langflow.exceptions.component import ComponentBuildException from langflow.graph.graph.base import Graph @@ -139,6 +146,294 @@ async def retrieve_vertices_order( logger.exception(exc) raise HTTPException(status_code=500, detail=str(exc)) from exc +@router.post("/build/{flow_id}/flow") +async def build_flow( + background_tasks: BackgroundTasks, + flow_id: uuid.UUID, + inputs: Annotated[Optional[InputValueRequest], Body(embed=True)] = None, + data: Annotated[Optional[FlowDataRequest], Body(embed=True)] = None, + files: Optional[list[str]] = None, + stop_component_id: Optional[str] = None, + start_component_id: Optional[str] = None, + chat_service: "ChatService" = Depends(get_chat_service), + current_user=Depends(get_current_active_user), + telemetry_service: "TelemetryService" = Depends(get_telemetry_service), + session=Depends(get_session), + +): + + async def build_graph_and_get_order() -> tuple[list[str], list[str], "Graph"]: + start_time = time.perf_counter() + components_count = None + try: + flow_id_str = str(flow_id) + if not data: + graph = await build_graph_from_db_no_cache(flow_id=flow_id_str, session=session) + else: + graph = Graph.from_payload(data.model_dump(), flow_id_str) + graph.validate_stream() + if stop_component_id or start_component_id: + try: + first_layer = graph.sort_vertices(stop_component_id, start_component_id) + except Exception as exc: + logger.error(exc) + first_layer = graph.sort_vertices() + else: + first_layer = graph.sort_vertices() + + for vertex_id in first_layer: + graph.run_manager.add_to_vertices_being_run(vertex_id) + + # Now vertices is a list of lists + # We need to get the id of each vertex + # and return the same structure but only with the ids + components_count = len(graph.vertices) + vertices_to_run = list(graph.vertices_to_run.union(get_top_level_vertices(graph, graph.vertices_to_run))) + background_tasks.add_task( + telemetry_service.log_package_playground, + PlaygroundPayload( + playgroundSeconds=int(time.perf_counter() - start_time), + playgroundComponentCount=components_count, + playgroundSuccess=True, + ), + ) + return first_layer, vertices_to_run, graph + except Exception as exc: + background_tasks.add_task( + telemetry_service.log_package_playground, + PlaygroundPayload( + playgroundSeconds=int(time.perf_counter() - start_time), + playgroundComponentCount=components_count, + playgroundSuccess=False, + playgroundErrorMessage=str(exc), + ), + ) + if "stream or streaming set to True" in str(exc): + raise HTTPException(status_code=400, detail=str(exc)) + logger.error(f"Error checking build status: {exc}") + logger.exception(exc) + raise HTTPException(status_code=500, detail=str(exc)) from exc + + + async def _build_vertex(vertex_id: str, graph: "Graph") -> VertexBuildResponse: + flow_id_str = str(flow_id) + + next_runnable_vertices = [] + top_level_vertices = [] + start_time = time.perf_counter() + error_message = None + try: + vertex = graph.get_vertex(vertex_id) + try: + lock = chat_service._async_cache_locks[flow_id_str] + ( + result_dict, + params, + valid, + artifacts, + vertex, + ) = await graph.build_vertex( + chat_service=None, + vertex_id=vertex_id, + user_id=current_user.id, + inputs_dict=inputs.model_dump() if inputs else {}, + files=files, + ) + next_runnable_vertices = await graph.get_next_runnable_vertices(lock, vertex=vertex, cache=False) + top_level_vertices = graph.get_top_level_vertices(next_runnable_vertices) + + result_data_response = ResultDataResponse.model_validate(result_dict, from_attributes=True) + except Exception as exc: + if isinstance(exc, ComponentBuildException): + params = exc.message + tb = exc.formatted_traceback + else: + tb = traceback.format_exc() + logger.exception(f"Error building Component: {exc}") + params = format_exception_message(exc) + message = {"errorMessage": params, "stackTrace": tb} + valid = False + error_message = params + output_label = vertex.outputs[0]["name"] if vertex.outputs else "output" + outputs = {output_label: OutputValue(message=message, type="error")} + result_data_response = ResultDataResponse(results={}, outputs=outputs) + artifacts = {} + background_tasks.add_task(graph.end_all_traces, error=exc) + + result_data_response.message = artifacts + + # Log the vertex build + if not vertex.will_stream: + background_tasks.add_task( + log_vertex_build, + flow_id=flow_id_str, + vertex_id=vertex_id.split("-")[0], + valid=valid, + params=params, + data=result_data_response, + artifacts=artifacts, + ) + + timedelta = time.perf_counter() - start_time + duration = format_elapsed_time(timedelta) + result_data_response.duration = duration + result_data_response.timedelta = timedelta + vertex.add_build_time(timedelta) + inactivated_vertices = list(graph.inactivated_vertices) + graph.reset_inactivated_vertices() + graph.reset_activated_vertices() + # graph.stop_vertex tells us if the user asked + # to stop the build of the graph at a certain vertex + # if it is in next_vertices_ids, we need to remove other + # vertices from next_vertices_ids + if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices: + next_runnable_vertices = [graph.stop_vertex] + + if not graph.run_manager.vertices_being_run and not next_runnable_vertices: + background_tasks.add_task(graph.end_all_traces) + + build_response = VertexBuildResponse( + inactivated_vertices=list(set(inactivated_vertices)), + next_vertices_ids=list(set(next_runnable_vertices)), + top_level_vertices=list(set(top_level_vertices)), + valid=valid, + params=params, + id=vertex.id, + data=result_data_response, + ) + background_tasks.add_task( + telemetry_service.log_package_component, + ComponentPayload( + componentName=vertex_id.split("-")[0], + componentSeconds=int(time.perf_counter() - start_time), + componentSuccess=valid, + componentErrorMessage=error_message, + ), + ) + return build_response + except Exception as exc: + background_tasks.add_task( + telemetry_service.log_package_component, + ComponentPayload( + componentName=vertex_id.split("-")[0], + componentSeconds=int(time.perf_counter() - start_time), + componentSuccess=False, + componentErrorMessage=str(exc), + ), + ) + logger.error(f"Error building Component: \n\n{exc}") + logger.exception(exc) + message = parse_exception(exc) + raise HTTPException(status_code=500, detail=message) from exc + + + def send_event(event_type: str, value: dict, queue: asyncio.Queue) -> None: + json_data = { + "event": event_type, + "data": value + } + event_id = uuid.uuid4() + logger.debug(f"sending event {event_id}: {event_type}") + str_data = json.dumps(json_data) + "\n\n" + queue.put_nowait((event_id, str_data.encode('utf-8'), time.time())) + + async def build_vertices(vertex_id: str, graph: "Graph", queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None: + build_task = asyncio.create_task(await asyncio.to_thread(_build_vertex, vertex_id, graph)) + try: + await build_task + except asyncio.CancelledError: + build_task.cancel() + return + + vertex_build_response: VertexBuildResponse = build_task.result() + # send built event or error event + send_event("end_vertex", { + "build_data": json.loads(vertex_build_response.model_dump_json())}, + queue) + await client_consumed_queue.get() + if vertex_build_response.valid: + if vertex_build_response.next_vertices_ids: + tasks = [] + for next_vertex_id in vertex_build_response.next_vertices_ids: + task = asyncio.create_task(build_vertices(next_vertex_id, graph, queue, client_consumed_queue)) + tasks.append(task) + try: + await asyncio.gather(*tasks) + except asyncio.CancelledError: + for task in tasks: + task.cancel() + return + + + async def event_generator(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None: + if not data: + # using another thread since the DB query is I/O bound + vertices_task = asyncio.create_task(await asyncio.to_thread(build_graph_and_get_order)) + try: + await vertices_task + except asyncio.CancelledError: + vertices_task.cancel() + return + + ids, vertices_to_run, graph = vertices_task.result() + else: + ids, vertices_to_run, graph = await build_graph_and_get_order() + send_event("vertices_sorted", + { + "ids": ids, + "to_run": vertices_to_run + }, queue) + await client_consumed_queue.get() + + tasks = [] + for vertex_id in ids: + task = asyncio.create_task(build_vertices(vertex_id, graph, queue, client_consumed_queue)) + tasks.append(task) + try: + await asyncio.gather(*tasks) + except asyncio.CancelledError: + for task in tasks: + task.cancel() + return + send_event("end", {}, queue) + await queue.put((None, None, time.time)) + + async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None: + while True: + event_id, value, put_time = await queue.get() + if value is None: + break + get_time = time.time() + yield value + get_time_yield = time.time() + client_consumed_queue.put_nowait(event_id) + logger.debug(f"consumed event {str(event_id)} (time in queue, {get_time - put_time:.4f}, client {get_time_yield - get_time:.4f})") + + asyncio_queue = asyncio.Queue() + asyncio_queue_client_consumed = asyncio.Queue() + main_task = asyncio.create_task(event_generator(asyncio_queue, asyncio_queue_client_consumed)) + def on_disconnect(): + logger.debug("Client disconnected, closing tasks") + main_task.cancel() + return DisconnectHandlerStreamingResponse(consume_and_yield(asyncio_queue, asyncio_queue_client_consumed), media_type="application/x-ndjson", on_disconnect=on_disconnect) + +class DisconnectHandlerStreamingResponse(StreamingResponse): + + def __init__(self, + content: ContentStream, status_code: int = 200, headers: typing.Mapping[str, str] | None = None, + media_type: str | None = None, background: BackgroundTask | None = None, on_disconnect: Optional[typing.Callable] = None): + super().__init__(content, status_code, headers, media_type, background) + self.on_disconnect = on_disconnect + + async def listen_for_disconnect(self, receive: Receive) -> None: + while True: + message = await receive() + if message["type"] == "http.disconnect": + if self.on_disconnect: + await self.on_disconnect() + break + + @router.post("/build/{flow_id}/vertices/{vertex_id}") async def build_vertex( diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index cdad9528409c..a83396b7f69b 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -304,6 +304,7 @@ class InputValueRequest(BaseModel): ) + class SimplifiedAPIRequest(BaseModel): input_value: Optional[str] = Field(default=None, description="The input value") input_type: Optional[InputType] = Field(default="chat", description="The input type") diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 35db271c272c..512dad3dde48 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -851,7 +851,7 @@ def get_root_of_group_node(self, vertex_id: str) -> Vertex: async def build_vertex( self, - chat_service: ChatService, + chat_service: Optional[ChatService], vertex_id: str, inputs_dict: Optional[Dict[str, str]] = None, files: Optional[list[str]] = None, @@ -881,12 +881,13 @@ async def build_vertex( params = "" if vertex.frozen: # Check the cache for the vertex - cached_result = await chat_service.get_cache(key=vertex.id) + cached_result = await chat_service.get_cache(key=vertex.id) if chat_service else CacheMiss if isinstance(cached_result, CacheMiss): await vertex.build( user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files ) - await chat_service.set_cache(key=vertex.id, data=vertex) + if chat_service: + await chat_service.set_cache(key=vertex.id, data=vertex) else: cached_vertex = cached_result["result"] # Now set update the vertex with the cached vertex @@ -903,7 +904,8 @@ async def build_vertex( await vertex.build( user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files ) - await chat_service.set_cache(key=vertex.id, data=vertex) + if chat_service: + await chat_service.set_cache(key=vertex.id, data=vertex) if vertex.result is not None: params = f"{vertex._built_object_repr()}{params}" diff --git a/src/frontend/src/controllers/API/api.tsx b/src/frontend/src/controllers/API/api.tsx index 1a4156a1bee9..cef1cb656300 100644 --- a/src/frontend/src/controllers/API/api.tsx +++ b/src/frontend/src/controllers/API/api.tsx @@ -16,11 +16,12 @@ const api: AxiosInstance = axios.create({ baseURL: "", }); +const cookies = new Cookies(); function ApiInterceptor() { const autoLogin = useAuthStore((state) => state.autoLogin); const setErrorData = useAlertStore((state) => state.setErrorData); let { accessToken, authenticationErrorCount } = useContext(AuthContext); - const cookies = new Cookies(); + const setSaveLoading = useFlowsManagerStore((state) => state.setSaveLoading); const { mutate: mutationLogout } = useLogout(); const { mutate: mutationRenewAccessToken } = useRefreshAccessToken(); @@ -205,4 +206,83 @@ function ApiInterceptor() { return null; } -export { ApiInterceptor, api }; +export type StreamingRequestParams = { + method: string; + url: string; + onData: (event: object) => Promise; + body?: object; + onError?: (statusCode: number) => void; +}; + +async function performStreamingRequest({ + method, + url, + onData, + body, + onError, +}: StreamingRequestParams) { + let headers = { + "Content-Type": "application/json", + // this flag is fundamental to ensure server stops tasks when client disconnects + Connection: "close", + }; + const accessToken = cookies.get(LANGFLOW_ACCESS_TOKEN); + if (accessToken) { + headers["Authorization"] = `Bearer ${accessToken}`; + } + const controller = new AbortController() + const params = { + method: method, + headers: headers, + signal: controller.signal + }; + if (body) { + params["body"] = JSON.stringify(body); + } + let current: string[] = []; + let textDecoder = new TextDecoder(); + const response = await fetch(url, params); + if (!response.ok) { + if (onError) { + onError(response.status); + } else { + throw new Error("error in streaming request"); + } + } + if (response.body === null) { + return; + } + for await (const chunk of response.body) { + const decodedChunk = await textDecoder.decode(chunk); + let all = decodedChunk.split("\n\n"); + for (const string of all) { + if (string.endsWith("}")) { + const allString = current.join("") + string; + let data: object; + try { + data = JSON.parse(allString); + current = []; + } catch (e) { + current.push(string); + continue; + } + const shouldContinue = await onData(data); + if (!shouldContinue) { + controller.abort(); + return + } + } else { + current.push(string); + } + } + } + if (current.length > 0) { + const allString = current.join(""); + if (allString) { + const data = JSON.parse(current.join("")); + await onData(data); + } + } +} + +export { ApiInterceptor, api, performStreamingRequest }; diff --git a/src/frontend/src/modals/IOModal/components/chatView/chatMessage/index.tsx b/src/frontend/src/modals/IOModal/components/chatView/chatMessage/index.tsx index 458d19e6baf3..e1739be6196c 100644 --- a/src/frontend/src/modals/IOModal/components/chatView/chatMessage/index.tsx +++ b/src/frontend/src/modals/IOModal/components/chatView/chatMessage/index.tsx @@ -38,7 +38,6 @@ export default function ChatMessage({ const [chatMessage, setChatMessage] = useState(chatMessageString); const [isStreaming, setIsStreaming] = useState(false); const eventSource = useRef(undefined); - const updateFlowPool = useFlowStore((state) => state.updateFlowPool); const setErrorData = useAlertStore((state) => state.setErrorData); const chatMessageRef = useRef(chatMessage); diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index cbc431defb87..6c963458b94d 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -29,7 +29,7 @@ import { targetHandleType, } from "../types/flow"; import { FlowStoreType, VertexLayerElementType } from "../types/zustand/flow"; -import { buildVertices } from "../utils/buildUtils"; +import { buildFlowVerticesWithFallback } from "../utils/buildUtils"; import { checkChatInput, checkOldComponents, @@ -607,20 +607,8 @@ const useFlowStore = create((set, get) => ({ ); useFlowStore.getState().updateBuildStatus([vertexBuildData.id], status); - - const verticesIds = get().verticesBuild?.verticesIds; - const newFlowBuildStatus = { ...get().flowBuildStatus }; - // filter out the vertices that are not status - - const verticesToUpdate = verticesIds?.filter( - (id) => newFlowBuildStatus[id]?.status !== BuildStatus.BUILT, - ); - - if (verticesToUpdate) { - useFlowStore.getState().updateBuildStatus(verticesToUpdate, status); - } } - await buildVertices({ + await buildFlowVerticesWithFallback({ input_value, files, flowId: currentFlow!.id, @@ -672,8 +660,8 @@ const useFlowStore = create((set, get) => ({ useFlowStore.getState().updateBuildStatus(idList, BuildStatus.BUILDING); }, onValidateNodes: validateSubgraph, - nodes: !get().onFlowPage ? get().nodes : undefined, - edges: !get().onFlowPage ? get().edges : undefined, + nodes: get().onFlowPage ? get().nodes : undefined, + edges: get().onFlowPage ? get().edges : undefined, }); get().setIsBuilding(false); get().setLockChat(false); @@ -690,7 +678,7 @@ const useFlowStore = create((set, get) => ({ vertices: { verticesIds: string[]; verticesLayers: VertexLayerElementType[][]; - runId: string; + runId?: string; verticesToRun: string[]; } | null, ) => { diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index 631537939dc0..500a5aeb87d5 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -147,7 +147,7 @@ export type FlowStoreType = { vertices: { verticesIds: string[]; verticesLayers: VertexLayerElementType[][]; - runId: string; + runId?: string; verticesToRun: string[]; } | null, ) => void; @@ -156,7 +156,7 @@ export type FlowStoreType = { verticesBuild: { verticesIds: string[]; verticesLayers: VertexLayerElementType[][]; - runId: string; + runId?: string; verticesToRun: string[]; } | null; updateBuildStatus: (nodeId: string[], status: BuildStatus) => void; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index ad9ac48688fc..969138598ee4 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -1,3 +1,5 @@ +import { BASE_URL_API } from "@/constants/constants"; +import { performStreamingRequest } from "@/controllers/API/api"; import { AxiosError } from "axios"; import { Edge, Node } from "reactflow"; import { BuildStatus } from "../constants/enums"; @@ -66,7 +68,7 @@ export async function updateVerticesOrder( ): Promise<{ verticesLayers: VertexLayerElementType[][]; verticesIds: string[]; - runId: string; + runId?: string; verticesToRun: string[]; }> { return new Promise(async (resolve, reject) => { @@ -115,6 +117,176 @@ export async function updateVerticesOrder( }); } +export async function buildFlowVerticesWithFallback( + params: BuildVerticesParams, +) { + try { + return await buildFlowVertices(params); + } catch (e: any) { + if (e.message === "endpoint not available") { + return await buildVertices(params); + } + throw e; + } +} + +const MIN_VISUAL_BUILD_TIME_MS = 300; + +export async function buildFlowVertices({ + flowId, + input_value, + files, + startNodeId, + stopNodeId, + onGetOrderSuccess, + onBuildUpdate, + onBuildComplete, + onBuildError, + onBuildStart, + onValidateNodes, + nodes, + edges, + setLockChat, +}: BuildVerticesParams) { + let url = `${BASE_URL_API}build/${flowId}/flow?`; + if (startNodeId) { + url = `${url}&start_component_id=${startNodeId}`; + } + if (stopNodeId) { + url = `${url}&stop_component_id=${stopNodeId}`; + } + const postData = {}; + if (typeof input_value !== "undefined") { + postData["inputs"] = { input_value: input_value }; + } + if (files) { + postData["files"] = files; + } + if (nodes) { + postData["data"] = { + nodes, + edges, + }; + } + + const buildResults: Array = []; + + const verticesStartTimeMs: Map = new Map(); + + const onEvent = async (type, data): Promise => { + const onStartVertices = (ids: Array) => { + useFlowStore.getState().updateBuildStatus(ids, BuildStatus.TO_BUILD); + if (onBuildStart) + onBuildStart(ids.map((id) => ({ id: id, reference: id }))); + ids.forEach((id) => verticesStartTimeMs.set(id, Date.now())); + }; + switch (type) { + case "vertices_sorted": { + const verticesToRun = data.to_run; + const verticesIds = data.ids; + + onStartVertices(verticesIds); + + let verticesLayers: Array> = + verticesIds.map((id: string) => { + return [{ id: id, reference: id }]; + }); + + useFlowStore.getState().updateVerticesBuild({ + verticesLayers, + verticesIds, + verticesToRun, + }); + if (onValidateNodes) { + try { + onValidateNodes(data.to_run); + if (onGetOrderSuccess) onGetOrderSuccess(); + useFlowStore.getState().setIsBuilding(true); + return true; + } catch (e) { + useFlowStore.getState().setIsBuilding(false); + setLockChat && setLockChat(false); + return false; + } + } + return true; + } + case "end_vertex": { + const buildData = data.build_data; + const startTimeMs = verticesStartTimeMs.get(buildData.id); + if (startTimeMs) { + const delta = Date.now() - startTimeMs; + if (delta < MIN_VISUAL_BUILD_TIME_MS) { + // this is a visual trick to make the build process look more natural + await new Promise((resolve) => + setTimeout(resolve, MIN_VISUAL_BUILD_TIME_MS - delta), + ); + } + } + + if (onBuildUpdate) { + if (!buildData.valid) { + // lots is a dictionary with the key the output field name and the value the log object + // logs: { [key: string]: { message: any; type: string }[] }; + const errorMessages = Object.keys(buildData.data.outputs).map( + (key) => { + const outputs = buildData.data.outputs[key]; + if (Array.isArray(outputs)) { + return outputs + .filter((log) => isErrorLogType(log.message)) + .map((log) => log.message.errorMessage); + } + if (!isErrorLogType(outputs.message)) { + return []; + } + return [outputs.message.errorMessage]; + }, + ); + onBuildError!("Error Building Component", errorMessages, [ + { id: buildData.id }, + ]); + onBuildUpdate(buildData, BuildStatus.ERROR, ""); + buildResults.push(false); + return false; + } else { + onBuildUpdate(buildData, BuildStatus.BUILT, ""); + buildResults.push(true); + } + } + if (buildData.next_vertices_ids) { + onStartVertices(buildData.next_vertices_ids); + } + return true; + } + case "end": { + const allNodesValid = buildResults.every((result) => result); + onBuildComplete!(allNodesValid); + useFlowStore.getState().setIsBuilding(false); + return true; + } + default: + return true; + } + return true; + }; + return performStreamingRequest({ + method: "POST", + url, + body: postData, + onData: async (event) => { + const type = event["event"]; + const data = event["data"]; + return await onEvent(type, data); + }, + onError: (statusCode) => { + if (statusCode === 404) { + throw new Error("endpoint not available"); + } + throw new Error("error in streaming request"); + }, + }); +} + export async function buildVertices({ flowId, input_value, @@ -252,6 +424,7 @@ export async function buildVertices({ useFlowStore.getState().setIsBuilding(false); } } + async function buildVertex({ flowId, id, From ddc6ab45a506e82a20b037946d32b1d6ba3f1309 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Thu, 1 Aug 2024 19:06:40 +0200 Subject: [PATCH 2/9] fix use session_id --- src/backend/base/langflow/api/utils.py | 17 ++++++++++------- src/backend/base/langflow/api/v1/chat.py | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index d3204504b99a..50c8e13209c7 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -1,6 +1,6 @@ import uuid import warnings -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Dict from fastapi import HTTPException from sqlmodel import Session @@ -122,12 +122,9 @@ def format_elapsed_time(elapsed_time: float) -> str: return f"{minutes} {minutes_unit}, {seconds} {seconds_unit}" -async def build_graph_from_db_no_cache(flow_id: str, session: Session): +async def build_graph_from_data(flow_id: str, payload: Dict, **kwargs): """Build and cache the graph.""" - flow: Optional[Flow] = session.get(Flow, flow_id) - if not flow or not flow.data: - raise ValueError("Invalid flow ID") - graph = Graph.from_payload(flow.data, flow_id, flow_name=flow.name, user_id=str(flow.user_id)) + graph = Graph.from_payload(payload, flow_id, **kwargs) for vertex_id in graph._has_session_id_vertices: vertex = graph.get_vertex(vertex_id) if vertex is None: @@ -139,9 +136,15 @@ async def build_graph_from_db_no_cache(flow_id: str, session: Session): graph.set_run_id(run_id) graph.set_run_name() await graph.initialize_run() - return graph +async def build_graph_from_db_no_cache(flow_id: str, session: Session): + """Build and cache the graph.""" + flow: Optional[Flow] = session.get(Flow, flow_id) + if not flow or not flow.data: + raise ValueError("Invalid flow ID") + return await build_graph_from_data(flow_id, flow.data, flow_name=flow.name, user_id=str(flow.user_id)) + async def build_graph_from_db(flow_id: str, session: Session, chat_service: "ChatService"): graph = await build_graph_from_db_no_cache(flow_id, session) await chat_service.set_cache(flow_id, graph) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 7fe543dda151..c00639963835 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -20,7 +20,7 @@ format_elapsed_time, format_exception_message, get_top_level_vertices, - parse_exception, build_graph_from_db_no_cache, + parse_exception, build_graph_from_db_no_cache, build_graph_from_data, ) from langflow.api.v1.schemas import ( FlowDataRequest, @@ -170,7 +170,7 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], "Graph"]: if not data: graph = await build_graph_from_db_no_cache(flow_id=flow_id_str, session=session) else: - graph = Graph.from_payload(data.model_dump(), flow_id_str) + graph = await build_graph_from_data(flow_id_str, data.model_dump()) graph.validate_stream() if stop_component_id or start_component_id: try: From 7b8f5ec7c05d584dbf085754e19d11df438a015d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 2 Aug 2024 10:42:29 +0200 Subject: [PATCH 3/9] fix frozen --- src/backend/base/langflow/graph/graph/base.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 512dad3dde48..b5f9df8daf47 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -880,15 +880,11 @@ async def build_vertex( try: params = "" if vertex.frozen: - # Check the cache for the vertex - cached_result = await chat_service.get_cache(key=vertex.id) if chat_service else CacheMiss - if isinstance(cached_result, CacheMiss): - await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files - ) - if chat_service: - await chat_service.set_cache(key=vertex.id, data=vertex) + if chat_service: + cached_result = await chat_service.get_cache(key=vertex.id) else: + cached_result = None + if cached_result and not isinstance(cached_result, CacheMiss): cached_vertex = cached_result["result"] # Now set update the vertex with the cached vertex vertex._built = cached_vertex._built @@ -899,7 +895,12 @@ async def build_vertex( vertex._custom_component = cached_vertex._custom_component if vertex.result is not None: vertex.result.used_frozen_result = True - + else: + await vertex.build( + user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files + ) + if chat_service: + await chat_service.set_cache(key=vertex.id, data=vertex) else: await vertex.build( user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files From acc9ada86dfb92987c07f3ff248f708de15e63a0 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 2 Aug 2024 08:43:13 +0000 Subject: [PATCH 4/9] [autofix.ci] apply automated fixes --- src/backend/base/langflow/api/utils.py | 2 + src/backend/base/langflow/api/v1/chat.py | 63 +++++++++++---------- src/backend/base/langflow/api/v1/schemas.py | 1 - 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index 50c8e13209c7..b16ab1324f2f 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -138,6 +138,7 @@ async def build_graph_from_data(flow_id: str, payload: Dict, **kwargs): await graph.initialize_run() return graph + async def build_graph_from_db_no_cache(flow_id: str, session: Session): """Build and cache the graph.""" flow: Optional[Flow] = session.get(Flow, flow_id) @@ -145,6 +146,7 @@ async def build_graph_from_db_no_cache(flow_id: str, session: Session): raise ValueError("Invalid flow ID") return await build_graph_from_data(flow_id, flow.data, flow_name=flow.name, user_id=str(flow.user_id)) + async def build_graph_from_db(flow_id: str, session: Session, chat_service: "ChatService"): graph = await build_graph_from_db_no_cache(flow_id, session) await chat_service.set_cache(flow_id, graph) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index c00639963835..51713127474a 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -4,10 +4,9 @@ import traceback import typing import uuid -from asyncio import QueueEmpty from typing import TYPE_CHECKING, Annotated, Optional -from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request +from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException from fastapi.responses import StreamingResponse from loguru import logger from starlette.background import BackgroundTask @@ -20,7 +19,9 @@ format_elapsed_time, format_exception_message, get_top_level_vertices, - parse_exception, build_graph_from_db_no_cache, build_graph_from_data, + parse_exception, + build_graph_from_db_no_cache, + build_graph_from_data, ) from langflow.api.v1.schemas import ( FlowDataRequest, @@ -28,7 +29,7 @@ ResultDataResponse, StreamData, VertexBuildResponse, - VerticesOrderResponse + VerticesOrderResponse, ) from langflow.exceptions.component import ComponentBuildException from langflow.graph.graph.base import Graph @@ -146,6 +147,7 @@ async def retrieve_vertices_order( logger.exception(exc) raise HTTPException(status_code=500, detail=str(exc)) from exc + @router.post("/build/{flow_id}/flow") async def build_flow( background_tasks: BackgroundTasks, @@ -159,9 +161,7 @@ async def build_flow( current_user=Depends(get_current_active_user), telemetry_service: "TelemetryService" = Depends(get_telemetry_service), session=Depends(get_session), - ): - async def build_graph_and_get_order() -> tuple[list[str], list[str], "Graph"]: start_time = time.perf_counter() components_count = None @@ -214,7 +214,6 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], "Graph"]: logger.exception(exc) raise HTTPException(status_code=500, detail=str(exc)) from exc - async def _build_vertex(vertex_id: str, graph: "Graph") -> VertexBuildResponse: flow_id_str = str(flow_id) @@ -326,18 +325,16 @@ async def _build_vertex(vertex_id: str, graph: "Graph") -> VertexBuildResponse: message = parse_exception(exc) raise HTTPException(status_code=500, detail=message) from exc - def send_event(event_type: str, value: dict, queue: asyncio.Queue) -> None: - json_data = { - "event": event_type, - "data": value - } + json_data = {"event": event_type, "data": value} event_id = uuid.uuid4() logger.debug(f"sending event {event_id}: {event_type}") str_data = json.dumps(json_data) + "\n\n" - queue.put_nowait((event_id, str_data.encode('utf-8'), time.time())) + queue.put_nowait((event_id, str_data.encode("utf-8"), time.time())) - async def build_vertices(vertex_id: str, graph: "Graph", queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None: + async def build_vertices( + vertex_id: str, graph: "Graph", queue: asyncio.Queue, client_consumed_queue: asyncio.Queue + ) -> None: build_task = asyncio.create_task(await asyncio.to_thread(_build_vertex, vertex_id, graph)) try: await build_task @@ -347,9 +344,7 @@ async def build_vertices(vertex_id: str, graph: "Graph", queue: asyncio.Queue, c vertex_build_response: VertexBuildResponse = build_task.result() # send built event or error event - send_event("end_vertex", { - "build_data": json.loads(vertex_build_response.model_dump_json())}, - queue) + send_event("end_vertex", {"build_data": json.loads(vertex_build_response.model_dump_json())}, queue) await client_consumed_queue.get() if vertex_build_response.valid: if vertex_build_response.next_vertices_ids: @@ -364,7 +359,6 @@ async def build_vertices(vertex_id: str, graph: "Graph", queue: asyncio.Queue, c task.cancel() return - async def event_generator(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None: if not data: # using another thread since the DB query is I/O bound @@ -378,11 +372,7 @@ async def event_generator(queue: asyncio.Queue, client_consumed_queue: asyncio.Q ids, vertices_to_run, graph = vertices_task.result() else: ids, vertices_to_run, graph = await build_graph_and_get_order() - send_event("vertices_sorted", - { - "ids": ids, - "to_run": vertices_to_run - }, queue) + send_event("vertices_sorted", {"ids": ids, "to_run": vertices_to_run}, queue) await client_consumed_queue.get() tasks = [] @@ -407,21 +397,35 @@ async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio yield value get_time_yield = time.time() client_consumed_queue.put_nowait(event_id) - logger.debug(f"consumed event {str(event_id)} (time in queue, {get_time - put_time:.4f}, client {get_time_yield - get_time:.4f})") + logger.debug( + f"consumed event {str(event_id)} (time in queue, {get_time - put_time:.4f}, client {get_time_yield - get_time:.4f})" + ) asyncio_queue = asyncio.Queue() asyncio_queue_client_consumed = asyncio.Queue() main_task = asyncio.create_task(event_generator(asyncio_queue, asyncio_queue_client_consumed)) + def on_disconnect(): logger.debug("Client disconnected, closing tasks") main_task.cancel() - return DisconnectHandlerStreamingResponse(consume_and_yield(asyncio_queue, asyncio_queue_client_consumed), media_type="application/x-ndjson", on_disconnect=on_disconnect) -class DisconnectHandlerStreamingResponse(StreamingResponse): + return DisconnectHandlerStreamingResponse( + consume_and_yield(asyncio_queue, asyncio_queue_client_consumed), + media_type="application/x-ndjson", + on_disconnect=on_disconnect, + ) - def __init__(self, - content: ContentStream, status_code: int = 200, headers: typing.Mapping[str, str] | None = None, - media_type: str | None = None, background: BackgroundTask | None = None, on_disconnect: Optional[typing.Callable] = None): + +class DisconnectHandlerStreamingResponse(StreamingResponse): + def __init__( + self, + content: ContentStream, + status_code: int = 200, + headers: typing.Mapping[str, str] | None = None, + media_type: str | None = None, + background: BackgroundTask | None = None, + on_disconnect: Optional[typing.Callable] = None, + ): super().__init__(content, status_code, headers, media_type, background) self.on_disconnect = on_disconnect @@ -434,7 +438,6 @@ async def listen_for_disconnect(self, receive: Receive) -> None: break - @router.post("/build/{flow_id}/vertices/{vertex_id}") async def build_vertex( flow_id: uuid.UUID, diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index a83396b7f69b..cdad9528409c 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -304,7 +304,6 @@ class InputValueRequest(BaseModel): ) - class SimplifiedAPIRequest(BaseModel): input_value: Optional[str] = Field(default=None, description="The input value") input_type: Optional[InputType] = Field(default="chat", description="The input type") From 3bd2b9c85924a2c12829c0a7de12bb49a0eb6cba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 2 Aug 2024 11:11:53 +0200 Subject: [PATCH 5/9] prettier --- src/frontend/src/controllers/API/api.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/controllers/API/api.tsx b/src/frontend/src/controllers/API/api.tsx index cef1cb656300..00d38690a6ef 100644 --- a/src/frontend/src/controllers/API/api.tsx +++ b/src/frontend/src/controllers/API/api.tsx @@ -230,11 +230,11 @@ async function performStreamingRequest({ if (accessToken) { headers["Authorization"] = `Bearer ${accessToken}`; } - const controller = new AbortController() + const controller = new AbortController(); const params = { method: method, headers: headers, - signal: controller.signal + signal: controller.signal, }; if (body) { params["body"] = JSON.stringify(body); @@ -269,7 +269,7 @@ async function performStreamingRequest({ const shouldContinue = await onData(data); if (!shouldContinue) { controller.abort(); - return + return; } } else { current.push(string); From 7bf5a0aa4ea2a2821e871ce8a1618756bbc8d958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 2 Aug 2024 14:17:41 +0200 Subject: [PATCH 6/9] add tests --- .../langflow/services/telemetry/service.py | 22 +++-- src/backend/tests/conftest.py | 7 ++ src/backend/tests/unit/test_chat_endpoint.py | 86 +++++++++++++++++++ 3 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 src/backend/tests/unit/test_chat_endpoint.py diff --git a/src/backend/base/langflow/services/telemetry/service.py b/src/backend/base/langflow/services/telemetry/service.py index 7a26c9a1a294..59b95ec6fcc7 100644 --- a/src/backend/base/langflow/services/telemetry/service.py +++ b/src/backend/base/langflow/services/telemetry/service.py @@ -34,6 +34,7 @@ def __init__(self, settings_service: "SettingsService"): self.telemetry_queue: asyncio.Queue = asyncio.Queue() self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout self.running = False + self._stopping = False self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled) @@ -75,11 +76,16 @@ async def send_telemetry_data(self, payload: BaseModel, path: str | None = None) logger.error(f"Unexpected error occurred: {e}") async def log_package_run(self, payload: RunPayload): - await self.telemetry_queue.put((self.send_telemetry_data, payload, "run")) + await self._queue_event((self.send_telemetry_data, payload, "run")) async def log_package_shutdown(self): payload = ShutdownPayload(timeRunning=(datetime.now(timezone.utc) - self._start_time).seconds) - await self.telemetry_queue.put((self.send_telemetry_data, payload, "shutdown")) + await self._queue_event(payload) + + async def _queue_event(self, payload): + if self.do_not_track or self._stopping: + return + await self.telemetry_queue.put(payload) async def log_package_version(self): python_version = ".".join(platform.python_version().split(".")[:2]) @@ -95,13 +101,13 @@ async def log_package_version(self): arch=architecture, autoLogin=self.settings_service.auth_settings.AUTO_LOGIN, ) - await self.telemetry_queue.put((self.send_telemetry_data, payload, None)) + await self._queue_event((self.send_telemetry_data, payload, None)) async def log_package_playground(self, payload: PlaygroundPayload): - await self.telemetry_queue.put((self.send_telemetry_data, payload, "playground")) + await self._queue_event((self.send_telemetry_data, payload, "playground")) async def log_package_component(self, payload: ComponentPayload): - await self.telemetry_queue.put((self.send_telemetry_data, payload, "component")) + await self._queue_event((self.send_telemetry_data, payload, "component")) async def start(self): if self.running or self.do_not_track: @@ -123,11 +129,13 @@ async def flush(self): logger.error(f"Error flushing logs: {e}") async def stop(self): - if self.do_not_track: + if self.do_not_track or self._stopping: return try: - self.running = False + self._stopping = True + # flush all the remaining events and then stop await self.flush() + self.running = False if self.worker_task: self.worker_task.cancel() with contextlib.suppress(asyncio.CancelledError): diff --git a/src/backend/tests/conftest.py b/src/backend/tests/conftest.py index 11a3fa2b7d6f..a328ffe67106 100644 --- a/src/backend/tests/conftest.py +++ b/src/backend/tests/conftest.py @@ -53,6 +53,7 @@ def pytest_configure(config): pytest.TWO_OUTPUTS = data_path / "TwoOutputsTest.json" pytest.VECTOR_STORE_PATH = data_path / "Vector_store.json" pytest.SIMPLE_API_TEST = data_path / "SimpleAPITest.json" + pytest.MEMORY_CHATBOT_NO_LLM = data_path / "MemoryChatbotNoLLM.json" pytest.CODE_WITH_SYNTAX_ERROR = """ def get_text(): retun "Hello World" @@ -70,6 +71,7 @@ def get_text(): pytest.CHAT_INPUT, pytest.TWO_OUTPUTS, pytest.VECTOR_STORE_PATH, + pytest.MEMORY_CHATBOT_NO_LLM ]: assert path.exists(), f"File {path} does not exist. Available files: {list(data_path.iterdir())}" @@ -232,6 +234,11 @@ def json_webhook_test(): return f.read() +@pytest.fixture +def json_memory_chatbot_no_llm(): + with open(pytest.MEMORY_CHATBOT_NO_LLM, "r") as f: + return f.read() + @pytest.fixture(name="client", autouse=True) def client_fixture(session: Session, monkeypatch, request, load_flows_dir): # Set the database url to a test database diff --git a/src/backend/tests/unit/test_chat_endpoint.py b/src/backend/tests/unit/test_chat_endpoint.py new file mode 100644 index 000000000000..23c2e37494ce --- /dev/null +++ b/src/backend/tests/unit/test_chat_endpoint.py @@ -0,0 +1,86 @@ +import json +from uuid import UUID +from orjson import orjson + +from langflow.memory import get_messages +from langflow.services.database.models.flow import FlowCreate, FlowUpdate + + +def test_build_flow(client, json_memory_chatbot_no_llm, logged_in_headers): + flow_id = _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) + + with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: + consume_and_assert_stream(r) + + check_messages(flow_id) + +def test_build_flow_from_request_data(client, json_memory_chatbot_no_llm, logged_in_headers): + flow_id = _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) + flow_data = client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers).json() + + with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={ + "data": flow_data["data"] + }, headers=logged_in_headers) as r: + consume_and_assert_stream(r) + + check_messages(flow_id) + + +def test_build_flow_with_frozen_path(client, json_memory_chatbot_no_llm, logged_in_headers): + flow_id = _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) + + flow_data = client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers).json() + flow_data["data"]["nodes"][0]["data"]["node"]["frozen"] = True + response = client.patch("api/v1/flows/" + str(flow_id), json=FlowUpdate(name="Flow", description="description", data=flow_data["data"]).model_dump(), headers=logged_in_headers) + response.raise_for_status() + + with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: + consume_and_assert_stream(r) + + check_messages(flow_id) + +def check_messages(flow_id): + messages = get_messages(flow_id=UUID(flow_id), order="ASC") + assert len(messages) == 2 + assert messages[0].session_id == flow_id + assert messages[0].sender == "User" + assert messages[0].sender_name == "User" + assert messages[0].text == "" + assert messages[1].session_id == flow_id + assert messages[1].sender == "Machine" + assert messages[1].sender_name == "AI" + +def consume_and_assert_stream(r): + count = 0 + for line in r.iter_lines(): + # httpx split by \n, but ndjson sends two \n for each line + if not line: + continue + parsed = json.loads(line) + if count == 0: + assert parsed["event"] == "vertices_sorted" + ids = parsed["data"]["ids"] + ids.sort() + assert ids == ['ChatInput-CIGht', 'Memory-amN4Z'] + + to_run = parsed["data"]["to_run"] + to_run.sort() + assert to_run == ['ChatInput-CIGht', 'ChatOutput-QA7ej', 'Memory-amN4Z', 'Prompt-iWbCC'] + elif count > 0 and count < 5: + assert parsed["event"] == "end_vertex" + assert parsed["data"]["build_data"] is not None + elif count == 5: + assert parsed["event"] == "end" + else: + raise ValueError(f"Unexpected line: {line}") + count += 1 + + +def _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers): + vector_store = orjson.loads(json_memory_chatbot_no_llm) + data = vector_store["data"] + vector_store = FlowCreate(name="Flow", description="description", data=data, endpoint_name="f") + response = client.post("api/v1/flows/", json=vector_store.model_dump(), headers=logged_in_headers) + response.raise_for_status() + flow_id = response.json()["id"] + return flow_id From 96ed47a92eee6946913f7d0584f86b27706808c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 2 Aug 2024 14:17:48 +0200 Subject: [PATCH 7/9] add tests --- src/backend/tests/data/MemoryChatbotNoLLM.json | 1 + 1 file changed, 1 insertion(+) create mode 100644 src/backend/tests/data/MemoryChatbotNoLLM.json diff --git a/src/backend/tests/data/MemoryChatbotNoLLM.json b/src/backend/tests/data/MemoryChatbotNoLLM.json new file mode 100644 index 000000000000..55b5e29169fb --- /dev/null +++ b/src/backend/tests/data/MemoryChatbotNoLLM.json @@ -0,0 +1 @@ +{"id":"26c412c9-9e4a-406d-aadb-ef9a81badb3f","data":{"nodes":[{"data":{"description":"Create a prompt template with dynamic variables.","display_name":"Prompt","id":"Prompt-iWbCC","node":{"base_classes":["Message"],"beta":false,"conditional_paths":[],"custom_fields":{"template":["context","user_message"]},"description":"Create a prompt template with dynamic variables.","display_name":"Prompt","documentation":"","edited":false,"field_order":["template"],"frozen":false,"icon":"prompts","output_types":[],"outputs":[{"cache":true,"display_name":"Prompt Message","method":"build_prompt","name":"prompt","selected":"Message","types":["Message"],"value":"__UNDEFINED__"}],"pinned":false,"template":{"_type":"Component","code":{"advanced":true,"dynamic":true,"fileTypes":[],"file_path":"","info":"","list":false,"load_from_db":false,"multiline":true,"name":"code","password":false,"placeholder":"","required":true,"show":true,"title_case":false,"type":"code","value":"from langflow.base.prompts.api_utils import process_prompt_template\nfrom langflow.custom import Component\nfrom langflow.io import Output, PromptInput\nfrom langflow.schema.message import Message\nfrom langflow.template.utils import update_template_values\n\n\nclass PromptComponent(Component):\n display_name: str = \"Prompt\"\n description: str = \"Create a prompt template with dynamic variables.\"\n icon = \"prompts\"\n trace_type = \"prompt\"\n name = \"Prompt\"\n\n inputs = [\n PromptInput(name=\"template\", display_name=\"Template\"),\n ]\n\n outputs = [\n Output(display_name=\"Prompt Message\", name=\"prompt\", method=\"build_prompt\"),\n ]\n\n async def build_prompt(\n self,\n ) -> Message:\n prompt = await Message.from_template_and_variables(**self._attributes)\n self.status = prompt.text\n return prompt\n\n def post_code_processing(self, new_frontend_node: dict, current_frontend_node: dict):\n \"\"\"\n This function is called after the code validation is done.\n \"\"\"\n frontend_node = super().post_code_processing(new_frontend_node, current_frontend_node)\n template = frontend_node[\"template\"][\"template\"][\"value\"]\n _ = process_prompt_template(\n template=template,\n name=\"template\",\n custom_fields=frontend_node[\"custom_fields\"],\n frontend_node_template=frontend_node[\"template\"],\n )\n # Now that template is updated, we need to grab any values that were set in the current_frontend_node\n # and update the frontend_node with those values\n update_template_values(new_template=frontend_node, previous_template=current_frontend_node[\"template\"])\n return frontend_node\n"},"context":{"advanced":false,"display_name":"context","dynamic":false,"field_type":"str","fileTypes":[],"file_path":"","info":"","input_types":["Message","Text"],"list":false,"load_from_db":false,"multiline":true,"name":"context","password":false,"placeholder":"","required":false,"show":true,"title_case":false,"type":"str","value":""},"template":{"advanced":false,"display_name":"Template","dynamic":false,"info":"","list":false,"load_from_db":false,"name":"template","placeholder":"","required":false,"show":true,"title_case":false,"trace_as_input":true,"type":"prompt","value":"{context}\n\nUser: {user_message}\nAI: "},"user_message":{"advanced":false,"display_name":"user_message","dynamic":false,"field_type":"str","fileTypes":[],"file_path":"","info":"","input_types":["Message","Text"],"list":false,"load_from_db":false,"multiline":true,"name":"user_message","password":false,"placeholder":"","required":false,"show":true,"title_case":false,"type":"str","value":""}}},"type":"Prompt"},"dragging":false,"height":494,"id":"Prompt-iWbCC","position":{"x":1880.8227904110583,"y":625.8049209882275},"positionAbsolute":{"x":1880.8227904110583,"y":625.8049209882275},"selected":false,"type":"genericNode","width":384},{"data":{"description":"Get chat inputs from the Playground.","display_name":"Chat Input","id":"ChatInput-CIGht","node":{"template":{"_type":"Component","files":{"trace_as_metadata":true,"file_path":"","fileTypes":["txt","md","mdx","csv","json","yaml","yml","xml","html","htm","pdf","docx","py","sh","sql","js","ts","tsx","jpg","jpeg","png","bmp","image"],"list":true,"required":false,"placeholder":"","show":true,"value":"","name":"files","display_name":"Files","advanced":true,"dynamic":false,"info":"Files to be sent with the message.","title_case":false,"type":"file"},"code":{"type":"code","required":true,"placeholder":"","list":false,"show":true,"multiline":true,"value":"from langflow.base.data.utils import IMG_FILE_TYPES, TEXT_FILE_TYPES\nfrom langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, FileInput, MessageTextInput, MultilineInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\n\n\nclass ChatInput(ChatComponent):\n display_name = \"Chat Input\"\n description = \"Get chat inputs from the Playground.\"\n icon = \"ChatInput\"\n name = \"ChatInput\"\n\n inputs = [\n MultilineInput(\n name=\"input_value\",\n display_name=\"Text\",\n value=\"\",\n info=\"Message to be passed as input.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[\"Machine\", \"User\"],\n value=\"User\",\n info=\"Type of sender.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=\"User\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\", display_name=\"Session ID\", info=\"Session ID for the message.\", advanced=True\n ),\n FileInput(\n name=\"files\",\n display_name=\"Files\",\n file_types=TEXT_FILE_TYPES + IMG_FILE_TYPES,\n info=\"Files to be sent with the message.\",\n advanced=True,\n is_list=True,\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n files=self.files,\n )\n\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n","fileTypes":[],"file_path":"","password":false,"name":"code","advanced":true,"dynamic":true,"info":"","load_from_db":false,"title_case":false},"input_value":{"trace_as_input":true,"multiline":true,"trace_as_metadata":true,"load_from_db":false,"list":false,"required":false,"placeholder":"","show":true,"value":"","name":"input_value","display_name":"Text","advanced":false,"input_types":["Message"],"dynamic":false,"info":"Message to be passed as input.","title_case":false,"type":"str"},"sender":{"trace_as_metadata":true,"options":["Machine","User"],"required":false,"placeholder":"","show":true,"value":"User","name":"sender","display_name":"Sender Type","advanced":true,"dynamic":false,"info":"Type of sender.","title_case":false,"type":"str"},"sender_name":{"trace_as_input":true,"trace_as_metadata":true,"load_from_db":false,"list":false,"required":false,"placeholder":"","show":true,"value":"User","name":"sender_name","display_name":"Sender Name","advanced":true,"input_types":["Message"],"dynamic":false,"info":"Name of the sender.","title_case":false,"type":"str"},"session_id":{"trace_as_input":true,"trace_as_metadata":true,"load_from_db":false,"list":false,"required":false,"placeholder":"","show":true,"value":"","name":"session_id","display_name":"Session ID","advanced":true,"input_types":["Message"],"dynamic":false,"info":"Session ID for the message.","title_case":false,"type":"str"},"should_store_message":{"trace_as_metadata":true,"list":false,"required":false,"placeholder":"","show":true,"value":true,"name":"should_store_message","display_name":"Store Messages","advanced":true,"dynamic":false,"info":"Store the message in the history.","title_case":false,"type":"bool"}},"description":"Get chat inputs from the Playground.","icon":"ChatInput","base_classes":["Message"],"display_name":"Chat Input","documentation":"","custom_fields":{},"output_types":[],"pinned":false,"conditional_paths":[],"frozen":false,"outputs":[{"types":["Message"],"selected":"Message","name":"message","display_name":"Message","method":"message_response","value":"__UNDEFINED__","cache":true}],"field_order":["input_value","should_store_message","sender","sender_name","session_id","files"],"beta":false,"edited":false},"type":"ChatInput"},"dragging":false,"height":294,"id":"ChatInput-CIGht","position":{"x":1275.9262193671882,"y":836.1228056896347},"positionAbsolute":{"x":1275.9262193671882,"y":836.1228056896347},"selected":false,"type":"genericNode","width":384},{"data":{"description":"Display a chat message in the Playground.","display_name":"Chat Output","id":"ChatOutput-QA7ej","node":{"template":{"_type":"Component","code":{"type":"code","required":true,"placeholder":"","list":false,"show":true,"multiline":true,"value":"from langflow.base.io.chat import ChatComponent\nfrom langflow.inputs import BoolInput\nfrom langflow.io import DropdownInput, MessageTextInput, Output\nfrom langflow.memory import store_message\nfrom langflow.schema.message import Message\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n icon = \"ChatOutput\"\n name = \"ChatOutput\"\n\n inputs = [\n MessageTextInput(\n name=\"input_value\",\n display_name=\"Text\",\n info=\"Message to be passed as output.\",\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[\"Machine\", \"User\"],\n value=\"Machine\",\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\", display_name=\"Sender Name\", info=\"Name of the sender.\", value=\"AI\", advanced=True\n ),\n MessageTextInput(\n name=\"session_id\", display_name=\"Session ID\", info=\"Session ID for the message.\", advanced=True\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n ]\n outputs = [\n Output(display_name=\"Message\", name=\"message\", method=\"message_response\"),\n ]\n\n def message_response(self) -> Message:\n message = Message(\n text=self.input_value,\n sender=self.sender,\n sender_name=self.sender_name,\n session_id=self.session_id,\n )\n if (\n self.session_id\n and isinstance(message, Message)\n and isinstance(message.text, str)\n and self.should_store_message\n ):\n store_message(\n message,\n flow_id=self.graph.flow_id,\n )\n self.message.value = message\n\n self.status = message\n return message\n","fileTypes":[],"file_path":"","password":false,"name":"code","advanced":true,"dynamic":true,"info":"","load_from_db":false,"title_case":false},"data_template":{"trace_as_input":true,"trace_as_metadata":true,"load_from_db":false,"list":false,"required":false,"placeholder":"","show":true,"value":"{text}","name":"data_template","display_name":"Data Template","advanced":true,"input_types":["Message"],"dynamic":false,"info":"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.","title_case":false,"type":"str"},"input_value":{"trace_as_input":true,"trace_as_metadata":true,"load_from_db":false,"list":false,"required":false,"placeholder":"","show":true,"value":"","name":"input_value","display_name":"Text","advanced":false,"input_types":["Message"],"dynamic":false,"info":"Message to be passed as output.","title_case":false,"type":"str"},"sender":{"trace_as_metadata":true,"options":["Machine","User"],"required":false,"placeholder":"","show":true,"value":"Machine","name":"sender","display_name":"Sender Type","advanced":true,"dynamic":false,"info":"Type of sender.","title_case":false,"type":"str"},"sender_name":{"trace_as_input":true,"trace_as_metadata":true,"load_from_db":false,"list":false,"required":false,"placeholder":"","show":true,"value":"AI","name":"sender_name","display_name":"Sender Name","advanced":true,"input_types":["Message"],"dynamic":false,"info":"Name of the sender.","title_case":false,"type":"str"},"session_id":{"trace_as_input":true,"trace_as_metadata":true,"load_from_db":false,"list":false,"required":false,"placeholder":"","show":true,"value":"","name":"session_id","display_name":"Session ID","advanced":true,"input_types":["Message"],"dynamic":false,"info":"Session ID for the message.","title_case":false,"type":"str"},"should_store_message":{"trace_as_metadata":true,"list":false,"required":false,"placeholder":"","show":true,"value":true,"name":"should_store_message","display_name":"Store Messages","advanced":true,"dynamic":false,"info":"Store the message in the history.","title_case":false,"type":"bool"}},"description":"Display a chat message in the Playground.","icon":"ChatOutput","base_classes":["Message"],"display_name":"Chat Output","documentation":"","custom_fields":{},"output_types":[],"pinned":false,"conditional_paths":[],"frozen":false,"outputs":[{"types":["Message"],"selected":"Message","name":"message","display_name":"Message","method":"message_response","value":"__UNDEFINED__","cache":true}],"field_order":["input_value","should_store_message","sender","sender_name","session_id","data_template"],"beta":false,"edited":false},"type":"ChatOutput"},"height":294,"id":"ChatOutput-QA7ej","position":{"x":2487.48936094892,"y":703.7197762654707},"selected":true,"type":"genericNode","width":384,"positionAbsolute":{"x":2487.48936094892,"y":703.7197762654707},"dragging":true},{"data":{"description":"Retrieves stored chat messages from Langflow tables or an external memory.","display_name":"Chat Memory","id":"Memory-amN4Z","node":{"base_classes":["BaseChatMemory","Data","Message"],"beta":false,"conditional_paths":[],"custom_fields":{},"description":"Retrieves stored chat messages from Langflow tables or an external memory.","display_name":"Chat Memory","documentation":"","edited":false,"field_order":["memory","sender","sender_name","n_messages","session_id","order","template"],"frozen":false,"icon":"message-square-more","output_types":[],"outputs":[{"cache":true,"display_name":"Messages (Data)","method":"retrieve_messages","name":"messages","selected":"Data","types":["Data"],"value":"__UNDEFINED__"},{"cache":true,"display_name":"Messages (Text)","method":"retrieve_messages_as_text","name":"messages_text","selected":"Message","types":["Message"],"value":"__UNDEFINED__"},{"cache":true,"display_name":"Memory","method":"build_lc_memory","name":"lc_memory","selected":"BaseChatMemory","types":["BaseChatMemory"],"value":"__UNDEFINED__"}],"pinned":false,"template":{"_type":"Component","code":{"advanced":true,"dynamic":true,"fileTypes":[],"file_path":"","info":"","list":false,"load_from_db":false,"multiline":true,"name":"code","password":false,"placeholder":"","required":true,"show":true,"title_case":false,"type":"code","value":"from langflow.custom import Component\nfrom langflow.helpers.data import data_to_text\nfrom langflow.inputs import HandleInput\nfrom langflow.io import DropdownInput, IntInput, MessageTextInput, MultilineInput, Output\nfrom langflow.memory import get_messages, LCBuiltinChatMemory\nfrom langflow.schema import Data\nfrom langflow.schema.message import Message\nfrom langflow.field_typing import BaseChatMemory\nfrom langchain.memory import ConversationBufferMemory\n\n\nclass MemoryComponent(Component):\n display_name = \"Chat Memory\"\n description = \"Retrieves stored chat messages from Langflow tables or an external memory.\"\n icon = \"message-square-more\"\n name = \"Memory\"\n\n inputs = [\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"BaseChatMessageHistory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[\"Machine\", \"User\", \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Type of sender.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n advanced=True,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"Session ID of the chat history.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Messages (Data)\", name=\"messages\", method=\"retrieve_messages\"),\n Output(display_name=\"Messages (Text)\", name=\"messages_text\", method=\"retrieve_messages_as_text\"),\n Output(display_name=\"Memory\", name=\"lc_memory\", method=\"build_lc_memory\"),\n ]\n\n def retrieve_messages(self) -> Data:\n sender = self.sender\n sender_name = self.sender_name\n session_id = self.session_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender == \"Machine and User\":\n sender = None\n\n if self.memory:\n # override session_id\n self.memory.session_id = session_id\n\n stored = self.memory.messages\n if sender:\n expected_type = \"Machine\" if sender == \"Machine\" else \"User\"\n stored = [m for m in stored if m.type == expected_type]\n if order == \"ASC\":\n stored = stored[::-1]\n if n_messages:\n stored = stored[:n_messages]\n stored = [Message.from_lc_message(m) for m in stored]\n else:\n stored = get_messages(\n sender=sender,\n sender_name=sender_name,\n session_id=session_id,\n limit=n_messages,\n order=order,\n )\n self.status = stored\n return stored\n\n def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, self.retrieve_messages())\n self.status = stored_text\n return Message(text=stored_text)\n\n def build_lc_memory(self) -> BaseChatMemory:\n if self.memory:\n chat_memory = self.memory\n else:\n chat_memory = LCBuiltinChatMemory(flow_id=self.graph.flow_id, session_id=self.session_id)\n return ConversationBufferMemory(chat_memory=chat_memory)\n"},"memory":{"advanced":false,"display_name":"External Memory","dynamic":false,"info":"Retrieve messages from an external memory. If empty, it will use the Langflow tables.","input_types":["BaseChatMessageHistory"],"list":false,"name":"memory","placeholder":"","required":false,"show":true,"title_case":false,"trace_as_metadata":true,"type":"other","value":""},"n_messages":{"advanced":true,"display_name":"Number of Messages","dynamic":false,"info":"Number of messages to retrieve.","list":false,"name":"n_messages","placeholder":"","required":false,"show":true,"title_case":false,"trace_as_metadata":true,"type":"int","value":100},"order":{"advanced":true,"display_name":"Order","dynamic":false,"info":"Order of the messages.","name":"order","options":["Ascending","Descending"],"placeholder":"","required":false,"show":true,"title_case":false,"trace_as_metadata":true,"type":"str","value":"Ascending"},"sender":{"advanced":true,"display_name":"Sender Type","dynamic":false,"info":"Type of sender.","name":"sender","options":["Machine","User","Machine and User"],"placeholder":"","required":false,"show":true,"title_case":false,"trace_as_metadata":true,"type":"str","value":"Machine and User"},"sender_name":{"advanced":true,"display_name":"Sender Name","dynamic":false,"info":"Name of the sender.","input_types":["Message"],"list":false,"load_from_db":false,"name":"sender_name","placeholder":"","required":false,"show":true,"title_case":false,"trace_as_input":true,"trace_as_metadata":true,"type":"str","value":""},"session_id":{"advanced":true,"display_name":"Session ID","dynamic":false,"info":"Session ID of the chat history.","input_types":["Message"],"list":false,"load_from_db":false,"name":"session_id","placeholder":"","required":false,"show":true,"title_case":false,"trace_as_input":true,"trace_as_metadata":true,"type":"str","value":""},"template":{"advanced":true,"display_name":"Template","dynamic":false,"info":"The template to use for formatting the data. It can contain the keys {text}, {sender} or any other key in the message data.","input_types":["Message"],"list":false,"load_from_db":false,"multiline":true,"name":"template","placeholder":"","required":false,"show":true,"title_case":false,"trace_as_input":true,"trace_as_metadata":true,"type":"str","value":"{sender_name}: {text}"}}},"type":"Memory"},"dragging":false,"height":366,"id":"Memory-amN4Z","position":{"x":1308.5775646859402,"y":406.95204412025845},"positionAbsolute":{"x":1308.5775646859402,"y":406.95204412025845},"selected":false,"type":"genericNode","width":384}],"edges":[{"className":"","data":{"sourceHandle":{"dataType":"ChatInput","id":"ChatInput-CIGht","name":"message","output_types":["Message"]},"targetHandle":{"fieldName":"user_message","id":"Prompt-iWbCC","inputTypes":["Message","Text"],"type":"str"}},"id":"reactflow__edge-ChatInput-CIGht{œdataTypeœ:œChatInputœ,œidœ:œChatInput-CIGhtœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-Prompt-iWbCC{œfieldNameœ:œuser_messageœ,œidœ:œPrompt-iWbCCœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}","source":"ChatInput-CIGht","sourceHandle":"{œdataTypeœ:œChatInputœ,œidœ:œChatInput-CIGhtœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}","target":"Prompt-iWbCC","targetHandle":"{œfieldNameœ:œuser_messageœ,œidœ:œPrompt-iWbCCœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}"},{"className":"","data":{"sourceHandle":{"dataType":"Memory","id":"Memory-amN4Z","name":"messages_text","output_types":["Message"]},"targetHandle":{"fieldName":"context","id":"Prompt-iWbCC","inputTypes":["Message","Text"],"type":"str"}},"id":"reactflow__edge-Memory-amN4Z{œdataTypeœ:œMemoryœ,œidœ:œMemory-amN4Zœ,œnameœ:œmessages_textœ,œoutput_typesœ:[œMessageœ]}-Prompt-iWbCC{œfieldNameœ:œcontextœ,œidœ:œPrompt-iWbCCœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}","source":"Memory-amN4Z","sourceHandle":"{œdataTypeœ:œMemoryœ,œidœ:œMemory-amN4Zœ,œnameœ:œmessages_textœ,œoutput_typesœ:[œMessageœ]}","target":"Prompt-iWbCC","targetHandle":"{œfieldNameœ:œcontextœ,œidœ:œPrompt-iWbCCœ,œinputTypesœ:[œMessageœ,œTextœ],œtypeœ:œstrœ}"},{"source":"Prompt-iWbCC","sourceHandle":"{œdataTypeœ:œPromptœ,œidœ:œPrompt-iWbCCœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}","target":"ChatOutput-QA7ej","targetHandle":"{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-QA7ejœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}","data":{"targetHandle":{"fieldName":"input_value","id":"ChatOutput-QA7ej","inputTypes":["Message"],"type":"str"},"sourceHandle":{"dataType":"Prompt","id":"Prompt-iWbCC","name":"prompt","output_types":["Message"]}},"id":"reactflow__edge-Prompt-iWbCC{œdataTypeœ:œPromptœ,œidœ:œPrompt-iWbCCœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}-ChatOutput-QA7ej{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-QA7ejœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}"}],"viewport":{"x":-417.45799796990354,"y":3.1615551909424653,"zoom":0.45494095964690673}},"description":"This project can be used as a starting point for building a Chat experience with user specific memory. You can set a different Session ID to start a new message history.","name":"MemoryChatbotNoLLM","last_tested_version":"1.0.12","endpoint_name":null,"is_component":false} \ No newline at end of file From 628fdbc1fbc658bf9327e3464ef794d618319bbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 2 Aug 2024 14:20:30 +0200 Subject: [PATCH 8/9] fix mypy --- src/backend/base/langflow/api/v1/chat.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 51713127474a..9cbc6bdd5ab1 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -388,7 +388,7 @@ async def event_generator(queue: asyncio.Queue, client_consumed_queue: asyncio.Q send_event("end", {}, queue) await queue.put((None, None, time.time)) - async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> None: + async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> typing.AsyncGenerator: while True: event_id, value, put_time = await queue.get() if value is None: @@ -401,8 +401,8 @@ async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio f"consumed event {str(event_id)} (time in queue, {get_time - put_time:.4f}, client {get_time_yield - get_time:.4f})" ) - asyncio_queue = asyncio.Queue() - asyncio_queue_client_consumed = asyncio.Queue() + asyncio_queue: asyncio.Queue = asyncio.Queue() + asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue() main_task = asyncio.create_task(event_generator(asyncio_queue, asyncio_queue_client_consumed)) def on_disconnect(): From 1b7d3bf9e53474a6e93c5a7ada7d95f4ef026281 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 2 Aug 2024 12:21:25 +0000 Subject: [PATCH 9/9] [autofix.ci] apply automated fixes --- src/backend/tests/conftest.py | 3 ++- src/backend/tests/unit/test_chat_endpoint.py | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/backend/tests/conftest.py b/src/backend/tests/conftest.py index a328ffe67106..2dc4f5843422 100644 --- a/src/backend/tests/conftest.py +++ b/src/backend/tests/conftest.py @@ -71,7 +71,7 @@ def get_text(): pytest.CHAT_INPUT, pytest.TWO_OUTPUTS, pytest.VECTOR_STORE_PATH, - pytest.MEMORY_CHATBOT_NO_LLM + pytest.MEMORY_CHATBOT_NO_LLM, ]: assert path.exists(), f"File {path} does not exist. Available files: {list(data_path.iterdir())}" @@ -239,6 +239,7 @@ def json_memory_chatbot_no_llm(): with open(pytest.MEMORY_CHATBOT_NO_LLM, "r") as f: return f.read() + @pytest.fixture(name="client", autouse=True) def client_fixture(session: Session, monkeypatch, request, load_flows_dir): # Set the database url to a test database diff --git a/src/backend/tests/unit/test_chat_endpoint.py b/src/backend/tests/unit/test_chat_endpoint.py index 23c2e37494ce..81d0016ec23d 100644 --- a/src/backend/tests/unit/test_chat_endpoint.py +++ b/src/backend/tests/unit/test_chat_endpoint.py @@ -14,13 +14,14 @@ def test_build_flow(client, json_memory_chatbot_no_llm, logged_in_headers): check_messages(flow_id) + def test_build_flow_from_request_data(client, json_memory_chatbot_no_llm, logged_in_headers): flow_id = _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) flow_data = client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers).json() - with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={ - "data": flow_data["data"] - }, headers=logged_in_headers) as r: + with client.stream( + "POST", f"api/v1/build/{flow_id}/flow", json={"data": flow_data["data"]}, headers=logged_in_headers + ) as r: consume_and_assert_stream(r) check_messages(flow_id) @@ -31,7 +32,11 @@ def test_build_flow_with_frozen_path(client, json_memory_chatbot_no_llm, logged_ flow_data = client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers).json() flow_data["data"]["nodes"][0]["data"]["node"]["frozen"] = True - response = client.patch("api/v1/flows/" + str(flow_id), json=FlowUpdate(name="Flow", description="description", data=flow_data["data"]).model_dump(), headers=logged_in_headers) + response = client.patch( + "api/v1/flows/" + str(flow_id), + json=FlowUpdate(name="Flow", description="description", data=flow_data["data"]).model_dump(), + headers=logged_in_headers, + ) response.raise_for_status() with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: @@ -39,6 +44,7 @@ def test_build_flow_with_frozen_path(client, json_memory_chatbot_no_llm, logged_ check_messages(flow_id) + def check_messages(flow_id): messages = get_messages(flow_id=UUID(flow_id), order="ASC") assert len(messages) == 2 @@ -50,6 +56,7 @@ def check_messages(flow_id): assert messages[1].sender == "Machine" assert messages[1].sender_name == "AI" + def consume_and_assert_stream(r): count = 0 for line in r.iter_lines(): @@ -61,11 +68,11 @@ def consume_and_assert_stream(r): assert parsed["event"] == "vertices_sorted" ids = parsed["data"]["ids"] ids.sort() - assert ids == ['ChatInput-CIGht', 'Memory-amN4Z'] + assert ids == ["ChatInput-CIGht", "Memory-amN4Z"] to_run = parsed["data"]["to_run"] to_run.sort() - assert to_run == ['ChatInput-CIGht', 'ChatOutput-QA7ej', 'Memory-amN4Z', 'Prompt-iWbCC'] + assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"] elif count > 0 and count < 5: assert parsed["event"] == "end_vertex" assert parsed["data"]["build_data"] is not None