Skip to content

Commit

Permalink
refactor: add node
Browse files Browse the repository at this point in the history
  • Loading branch information
WLM1ke committed Apr 14, 2024
1 parent 27401de commit 9ff46fe
Showing 1 changed file with 8 additions and 18 deletions.
26 changes: 8 additions & 18 deletions poptimizer/app/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


class _Action[S: domain.State](Protocol):
async def __call__(self, ctx: domain.Ctx, state: S) -> bool: ...
async def __call__(self, ctx: domain.Ctx, state: S) -> None: ...


_DagID = NewType("_DagID", int)
Expand Down Expand Up @@ -81,26 +81,17 @@ def __init__(self, ctx_factory: uow.CtxFactory, state: S) -> None:
def id(self) -> _DagID:
return _DagID(id(self))

def add_node(self, action: _Action[S], *depends: _NodeUID) -> _NodeUID:
if self._status != _DagStatus.IDLE:
raise errors.AdaptersError("Can't add node to running dag")

if any(uid.dag != self.id for uid in depends):
raise errors.AdaptersError("Can't add node to dag which depends on other dag")

if set(depends) - set(self._nodes):
raise errors.AdaptersError("Can't add node to dag which depends on not existing node")

uid = _NodeUID(dag=self.id, node=_NodeID(len(self._nodes)))
def add_node_ignore_errors(self, action: _Action[S], *depends: _NodeUID) -> _NodeUID:
node = _Node(action=action, inputs_count=len(depends), retry=False)
self._nodes[uid] = node

for parent_uid in depends:
self._nodes[parent_uid].add_child(uid)

return uid
return self._add_node(node, *depends)

def add_node_with_retry(self, action: _Action[S], *depends: _NodeUID) -> _NodeUID:
node = _Node(action=action, inputs_count=len(depends), retry=True)

return self._add_node(node, *depends)

def _add_node(self, node: _Node[S], *depends: _NodeUID) -> _NodeUID:
if self._status != _DagStatus.IDLE:
raise errors.AdaptersError("Can't add node to running dag")

Expand All @@ -111,7 +102,6 @@ def add_node_with_retry(self, action: _Action[S], *depends: _NodeUID) -> _NodeUI
raise errors.AdaptersError("Can't add node to dag which depends on not existing node")

uid = _NodeUID(dag=self.id, node=_NodeID(len(self._nodes)))
node = _Node(action=action, inputs_count=len(depends), retry=True)
self._nodes[uid] = node

for parent_uid in depends:
Expand Down

0 comments on commit 9ff46fe

Please sign in to comment.