-
Notifications
You must be signed in to change notification settings - Fork 37
Async implementation of driver/adapter #171
Conversation
7908643
to
081db80
Compare
081db80
to
de39759
Compare
018fd96
to
a8667ca
Compare
a8667ca
to
f5c9826
Compare
Remaining:
|
Basically we pass the coroutine the entire way through. When a function depends on other data, we create a coroutine that awaits everything. Finally, we do a gather at the end. This involves no modification to the functiongraph code
f5c9826
to
6910d33
Compare
6910d33
to
97967a8
Compare
If we use tasks, we can await them twice. Furthermore, they'll begin scheduling earlier, and python handles keeping track of them. This is a much cleaner way to handle it.
examples/async/fastapi_example.py
Outdated
ad_hoc_utils.create_temporary_module( | ||
pipeline, | ||
computation1, | ||
foo, | ||
bar, | ||
some_data, | ||
computation2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably not do this in an example. This doesn't feel very "ad hoc".
Otherwise 💡, fastapi could be a possible code compilation target too.
hamilton/experimental/h_async.py
Outdated
if display_graph: | ||
raise ValueError(f'display_graph=True is not supported for the async graph adapter. ' | ||
f'Instead you should be using visualize_execution.') | ||
return await await_dict_of_tasks({key: asyncio.create_task(process_value(memoized_computation[key])) for key in final_vars}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return await await_dict_of_tasks({key: asyncio.create_task(process_value(memoized_computation[key])) for key in final_vars}) | |
task_dict = {key: asyncio.create_task(process_value(memoized_computation[key])) for key in final_vars} | |
return await await_dict_of_tasks(task_dict) |
Also is process_value
needed here? Seems a bit redundant with the result being wrapped in a task and that awaited?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process_value
is needed but create_task
is not I think...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay. But this seems like an important line that should be at least two... break out the dict comprehension at least.
hamilton/experimental/h_async.py
Outdated
callabl = node.callable | ||
|
||
async def new_fn(fn=callabl, **fn_kwargs): | ||
fn_kwargs = await await_dict_of_tasks({key: process_value(value) for key, value in fn_kwargs.items()}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can move this dict comprehension to its own line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just those two code nits, otherwise I think this looks good.
ca4b36f
to
bc3a7e2
Compare
1. Fixes docstring formats 2. Removes redundant call to create_task 3. Breaks example into modules
bc3a7e2
to
e044a40
Compare
@skrawcz this is what I meant
Basically we pass the coroutine the entire way through.
When a function depends on other data, we create a coroutine that
awaits everything. Finally, we do a gather at the end.
I think this is optimal but will need to dig in a bit/look into
things.
Otherwise, nifty POC with very rough edges.
The biggest hack is that I'm using a cache to store coroutine values as python makes it very hard to access them.
I think we can do better, but the point is we don't have to mess with execution.
[Short description explaining the high-level reason for the pull request]
Changes
Testing
Notes
Checklist
Testing checklist
Python - local testing