Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Add asyncio based driver and related components #167

Closed
skrawcz opened this issue Aug 2, 2022 · 5 comments · Fixed by #171
Closed

Add asyncio based driver and related components #167

skrawcz opened this issue Aug 2, 2022 · 5 comments · Fixed by #171
Labels
enhancement New feature or request

Comments

@skrawcz
Copy link
Collaborator

skrawcz commented Aug 2, 2022

Is your feature request related to a problem? Please describe.
It is conceivable to want to write Hamilton functions in an async based way.

For example, the data loading functions use asyncio based libraries, or you're making external web requests within a function...

However, Hamilton isn't set up to deal with async based functions when Hamilton itself is being run in an asyncio event loop.

Describe the solution you'd like
Since this is a different context. We should have a different Driver and related components that handle async functions and operating in an async manner. E.g. async def execute() should be the driver function that needs to be awaited.
E.g. something like this in a fastapi app:

from hamilton import driver
dag = driver.Driver({...}, modules, adapter=...)

@app.get("/endpoint")
async def compute( ... ):
    result = await dag.execute([output], inputs=data)
    # transform result for fastapi
    return result

Describe alternatives you've considered
If running Hamilton within an event loop is required:

  • Instead of doing this, you can instead separate the I/O based functions and do them outside of Hamilton. That way Hamilton remains pure computation and doesn't need to know about the asyncio world.
    E.g. From within an fast api app:
from hamilton import driver
dag = driver.Driver({...}, modules, adapter=...)

@app.get("/endpoint")
async def compute( ... ):
    data = await pull_from_db(...)
    result = dag.execute([output], inputs=data) # overrides= could also be used.
    # transform result for fastapi
    return result

If running async functions is required, but Hamilton itself is not being run in an event loop:

  • the functions requiring to call asyncio code use a primitive like asyncio.run() to wrap that code so Hamilton doesn't need to know about anything asyncio.

Additional context
Slack discussion - https://hamilton-opensource.slack.com/archives/C03M33QB4M8/p1659463180270219.

@skrawcz skrawcz added the enhancement New feature or request label Aug 2, 2022
@skrawcz
Copy link
Collaborator Author

skrawcz commented Aug 2, 2022

What does not work when run within a FastAPI web app::

async def get_from_db(...):
     return await db_load

def my_function(...):
      result =  asyncio.run(get_from_db(...))
      return result

Error:

RuntimeError: asyncio.run() cannot be called from a running event loop

What kind of works with the FastAPI:
You can schedule tasks with Hamilton -- but you cannot await them.

async def _log_result(result: str) -> None:
    """simulates logging asynchronously somewhere"""
    print(f'started {result}')
    await asyncio.sleep(2)
    print(f'finished {result}')


def log_result(result: str, event_loop: asyncio.AbstractEventLoop) -> dict:
    task = event_loop.create_task(_log_result(result))
    print("scheduled task", task)
    return {'result': True}

If Hamilton executes this, the _log_result coroutine will be scheduled after serving the response to the web request.

@skrawcz
Copy link
Collaborator Author

skrawcz commented Aug 3, 2022

@elijahbenizzy
Copy link
Collaborator

Implementation here! #171

@elijahbenizzy
Copy link
Collaborator

This is completed, needs to be released

@skrawcz skrawcz linked a pull request Aug 22, 2022 that will close this issue
12 tasks
@skrawcz
Copy link
Collaborator Author

skrawcz commented Aug 22, 2022

Released as part of 1.10.0

@skrawcz skrawcz closed this as completed Aug 22, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants