Skip to content

Commit

Permalink
Fixed timing issue for lazy inputs (#2890)
Browse files Browse the repository at this point in the history
  • Loading branch information
RunDevelopment authored May 20, 2024
1 parent 1d1b0f7 commit 4996d20
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
12 changes: 10 additions & 2 deletions backend/src/api/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, factory: Callable[[], T]):
self._factory = _to_result(factory)
self._value: _Result[T] | None = None
self._evaluating = False
self._eval_time = 0

@staticmethod
def ready(value: T) -> Lazy[T]:
Expand All @@ -75,12 +76,17 @@ def supplier() -> T:
@property
def has_value(self) -> bool:
"""Returns True if the value has been computed, otherwise False."""
return self._value is not None
return self._value is not None and self._value.is_ok

@property
def has_error(self) -> bool:
"""Returns True if the value has been computed and it errored instead, otherwise False."""
return self._value is not None
return self._value is not None and not self._value.is_ok

@property
def evaluation_time(self) -> float:
"""The time in seconds that it took to evaluate the value. If the value is not computed, returns 0."""
return self._eval_time

@property
def value(self) -> T:
Expand All @@ -94,7 +100,9 @@ def value(self) -> T:
else:
self._evaluating = True
try:
start = time.time()
self._value = self._factory()
self._eval_time = time.time() - start
finally:
self._evaluating = False

Expand Down
8 changes: 8 additions & 0 deletions backend/src/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,15 @@ async def __process(self, node: Node) -> NodeOutput | CollectorOutput:
inputs = await self.__gather_inputs(node)
context = self.__get_node_context(node)

def get_lazy_evaluation_time():
return sum(i.evaluation_time for i in inputs if isinstance(i, Lazy))

await self.progress.suspend()
self.__send_node_start(node)
await self.progress.suspend()

lazy_time_before = get_lazy_evaluation_time()

output, execution_time = await self.loop.run_in_executor(
self.pool,
timed_supplier(
Expand All @@ -586,6 +591,9 @@ async def __process(self, node: Node) -> NodeOutput | CollectorOutput:
)
await self.progress.suspend()

lazy_time_after = get_lazy_evaluation_time()
execution_time -= lazy_time_after - lazy_time_before

if isinstance(output, RegularOutput):
await self.__send_node_broadcast(node, output.output)
self.__send_node_finish(node, execution_time)
Expand Down

0 comments on commit 4996d20

Please sign in to comment.