From 223b114b9040c2b14b9051c531e7c3814ff6a197 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Wed, 29 Nov 2023 23:31:31 -0500 Subject: [PATCH] Add FastAPI example (#581) * add fastapi dir and change dep to rocksdict * commit inital example * update example * use lifespan * reorganize to have an endpoint trigger a producer * reorganize timer to be above fast api decorator --- examples/django/requirements/default.txt | 2 +- examples/fastapi_example.py | 72 ++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) create mode 100755 examples/fastapi_example.py diff --git a/examples/django/requirements/default.txt b/examples/django/requirements/default.txt index 042b4125b..e5aed2ea6 100644 --- a/examples/django/requirements/default.txt +++ b/examples/django/requirements/default.txt @@ -1,4 +1,4 @@ django -faust[rocksdb] +faust[rocksdict] eventlet faust-aioeventlet diff --git a/examples/fastapi_example.py b/examples/fastapi_example.py new file mode 100755 index 000000000..3e666fcda --- /dev/null +++ b/examples/fastapi_example.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +import asyncio +from contextlib import asynccontextmanager +from typing import Union + +from fastapi import FastAPI + +import faust + + +# This is just hello_world.py integrated with a FastAPI application + + +def fake_answer_to_everything_ml_model(x: float): + return x * 42 + + +ml_models = {} + + +# You MUST have "app" defined in order for Faust to discover the app +# if you're using "faust" on CLI, but this doesn't work yet +faust_app = faust.App( + 'hello-world-fastapi', + broker='kafka://localhost:9092', + web_enabled=False, +) +# app = faust_app + +greetings_topic = faust_app.topic('greetings', value_type=str) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Load the ML model + ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model + await faust_app.start() + yield + # Clean up the ML models and release the resources + ml_models.clear() + await faust_app.stop() + + +app = fastapi_app = FastAPI( + lifespan=lifespan, +) +# For now, run via "uvicorn fastapi_example:app" +# then visit http://127.0.0.1:8000/docs + + +@fastapi_app.get("/") +def read_root(): + return {"Hello": "World"} + + +@fastapi_app.get("/items/{item_id}") +def read_item(item_id: int, q: Union[str, None] = None): + return {"item_id": item_id, "q": q} + + +@faust_app.agent(greetings_topic) +async def print_greetings(greetings): + async for greeting in greetings: + print(greeting) + + +@faust_app.timer(5) # make sure you *always* add the timer above if you're using one +@fastapi_app.get("/produce") +async def produce(): + for i in range(100): + await greetings_topic.send(value=f'hello {i}') + return {"success": True}