Skip to content

Commit

Permalink
[autofix.ci] apply automated fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
autofix-ci[bot] authored Jul 31, 2024
1 parent 205fdd7 commit 44e8548
Showing 1 changed file with 20 additions and 25 deletions.
45 changes: 20 additions & 25 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async def retrieve_vertices_order(
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc


@router.post("/build/{flow_id}/flow")
async def build_flow(
background_tasks: BackgroundTasks,
Expand All @@ -158,9 +159,7 @@ async def build_flow(
current_user=Depends(get_current_active_user),
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
session=Depends(get_session),

):

async def get_vertices_order() -> VerticesOrderResponse:
start_time = time.perf_counter()
components_count = None
Expand Down Expand Up @@ -211,7 +210,6 @@ async def get_vertices_order() -> VerticesOrderResponse:
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc


async def _build_vertex(vertex_id: str) -> VertexBuildResponse:
flow_id_str = str(flow_id)

Expand Down Expand Up @@ -340,12 +338,8 @@ async def _build_vertex(vertex_id: str) -> VertexBuildResponse:
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc


async def send_event(event_type: str, value: dict, queue: asyncio.Queue) -> None:
json_data = {
"event": event_type,
"data": value
}
json_data = {"event": event_type, "data": value}
logger.debug(f"sending event {event_type}")
await queue.put(json.dumps(json_data))

Expand All @@ -359,9 +353,7 @@ async def build_vertices(vertex_id: str, queue: asyncio.Queue) -> None:

vertex_build_response: VertexBuildResponse = build_task.result()
# send built event or error event
await send_event("end_vertex", {
"build_data": json.loads(vertex_build_response.model_dump_json())},
queue)
await send_event("end_vertex", {"build_data": json.loads(vertex_build_response.model_dump_json())}, queue)
if vertex_build_response.valid:
if vertex_build_response.next_vertices_ids:
tasks = []
Expand All @@ -375,15 +367,12 @@ async def build_vertices(vertex_id: str, queue: asyncio.Queue) -> None:
task.cancel()
return


async def event_generator(queue: asyncio.Queue) -> None:
logger.debug("Starting event generator")
order_response = await get_vertices_order()
await send_event("vertices_sorted",
{
"ids": order_response.ids,
"to_run": order_response.vertices_to_run
}, queue)
await send_event(
"vertices_sorted", {"ids": order_response.ids, "to_run": order_response.vertices_to_run}, queue
)
to_build_ids = order_response.ids
tasks = []
for vertex_id in to_build_ids:
Expand All @@ -407,19 +396,26 @@ async def consume_and_yield(queue: asyncio.Queue) -> None:

asyncio_queue = asyncio.Queue()
main_task = asyncio.create_task(event_generator(asyncio_queue))

def on_disconnect():
logger.debug("Client disconnected, closing tasks")
main_task.cancel()

return DisconnectHandlerStreamingResponse(consume_and_yield(asyncio_queue),
media_type="application/x-ndjson",
on_disconnect=on_disconnect)
return DisconnectHandlerStreamingResponse(
consume_and_yield(asyncio_queue), media_type="application/x-ndjson", on_disconnect=on_disconnect
)

class DisconnectHandlerStreamingResponse(StreamingResponse):

def __init__(self,
content: ContentStream, status_code: int = 200, headers: typing.Mapping[str, str] | None = None,
media_type: str | None = None, background: BackgroundTask | None = None, on_disconnect: Optional[typing.Callable] = None):
class DisconnectHandlerStreamingResponse(StreamingResponse):
def __init__(
self,
content: ContentStream,
status_code: int = 200,
headers: typing.Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
on_disconnect: Optional[typing.Callable] = None,
):
super().__init__(content, status_code, headers, media_type, background)
self.on_disconnect = on_disconnect

Expand All @@ -432,7 +428,6 @@ async def listen_for_disconnect(self, receive: Receive) -> None:
break



@router.post("/build/{flow_id}/vertices/{vertex_id}")
async def build_vertex(
flow_id: uuid.UUID,
Expand Down

0 comments on commit 44e8548

Please sign in to comment.