Skip to content

Commit

Permalink
🎯 feat: migrate legacy out on ingestion task.
Browse files Browse the repository at this point in the history
  • Loading branch information
korawica committed May 10, 2024
1 parent 7a87ea8 commit 58b69d0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
24 changes: 10 additions & 14 deletions app/blueprints/api/ingestion/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
import queue
from typing import Callable

from app.core.__legacy.objects import (
Node as LegacyNode,
)
from app.core.base import (
get_plural,
registers,
Expand Down Expand Up @@ -82,9 +79,7 @@ def ingestion_foreground(module: str, external_parameters: dict) -> Result:
fwk_params={
"run_id": task.id,
"run_date": run_date,
"run_mode": task.parameters.others.get(
"run_mode", "ingestion"
),
"run_mode": task.component,
"task_params": task.parameters,
},
ext_params=external_parameters,
Expand Down Expand Up @@ -127,14 +122,15 @@ def ingestion_background(
bg_queue.put(task.id)
for idx, run_date in task.runner():
logger.info(f"{f'[ run_date: {run_date} ]':=<60}")
node: LegacyNode = LegacyNode(
name=task.parameters.name,
process_id=task.id,
run_mode=task.component.value,
run_date=run_date,
auto_init=task.parameters.others.get("initial_data", "N"),
auto_drop=task.parameters.others.get("drop_before_create", "N"),
external_parameters=external_parameters,
node: Node = Node.start(
task.parameters.name,
fwk_params={
"run_id": task.id,
"run_date": run_date,
"run_mode": task.component,
"task_params": task.parameters,
},
ext_params=external_parameters,
)
bg_queue.put(task.parameters.name)
logger.info(f"START {idx:02d}: {f'{node.name} ':~<50}")
Expand Down
3 changes: 2 additions & 1 deletion app/core/convertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,8 @@ def _generate_merge(self, values: Union[dict, list]) -> tuple:
]

def merge_with_key(
_data: dict, _key: Optional[str] = "data_merge"
_data: dict,
_key: Optional[str] = "data_merge",
) -> list[dict]:
if _key not in _data:
return [_data]
Expand Down

0 comments on commit 58b69d0

Please sign in to comment.