Skip to content

Commit

Permalink
⚙️ fixed: auto create from control setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
korawica committed May 11, 2024
1 parent 4b7b108 commit ee4346e
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 71 deletions.
38 changes: 28 additions & 10 deletions app/controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ def push_schema_setup() -> None:
logger.info("Success: Create Schema to target database.")


def push_func_setup(task: Optional[Task] = None) -> None:
def push_func_setup(
task: Optional[Task] = None,
force_drop: bool = False,
) -> None:
"""Run Setup function in `register.yaml`"""
task: Task = task or Task.make(module="function_setup")
for idx, _func_prop in enumerate(registers.functions, start=1):
Expand All @@ -76,10 +79,18 @@ def push_func_setup(task: Optional[Task] = None) -> None:
f"Success: Setup {_func.name} "
f"with logging value {task.duration()} sec"
)
elif force_drop:
_func.drop()
_func.create()
logger.info(
f"Success: Delete and Setup {_func.name} "
f"with logging value {task.duration()} sec"
)


def push_ctr_setup(
task: Optional[Task] = None,
force_drop: bool = False,
) -> None:
"""Run Setup Control Framework table in `register.yaml`"""
task: Task = task or Task.make(module="control_setup")
Expand All @@ -91,13 +102,17 @@ def push_ctr_setup(
"run_id": task.id,
"run_date": f"{task.start_time:%Y-%m-%d}",
"run_mode": TaskComponent.RECREATED,
"task_params": task.parameters.add_others({"auto_init": "Y"}),
"task_params": task.parameters.add_others(
{
"auto_create": True,
"auto_init": True,
}
),
},
)
if force_drop:
_node.create(force_drop=force_drop).init()
logger.info(f"START {idx:02d}: {f'{_node.name} ':~<50}")
# NOTE: Create without logging.
if not _node.exists():
_node.create()
logger.info(
f"Success create {_ctr_prop['name']!r} "
f"after app start with status {status.value}"
Expand Down Expand Up @@ -290,22 +305,25 @@ def push_initialize_frontend(): ...


def push_testing() -> None:
from .core.services import Pipeline

Schema().create()
push_func_setup()
logger.info("Start Testing ...")
task = Task.make(module="demo_docstring").add_param_others(
{"auto_init": "Y"}
)
task = Task.make(module="demo_docstring").add_param_others({"dummy": "Y"})
_node = Node.parse_task(
name="ctr_data_parameter",
fwk_params={
"run_id": task.id,
"run_date": f"{task.start_time:%Y-%m-%d}",
"run_mode": TaskComponent.RECREATED,
"task_params": task.parameters.add_others({"auto_init": "Y"}),
"task_params": task.parameters.add_others(
{"auto_init": True, "auto_drop": True, "auto_create": True}
),
},
)
_node.create(force_drop=True)
_pipeline = Pipeline.parse_name("ctr_all")
print(_pipeline)
# (
# ActionQuery.parse_name(fullname="query:query_shutdown")
# .add_ext_params(
Expand Down
2 changes: 2 additions & 0 deletions app/core/__legacy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,7 @@ def pipe_schedule(self) -> list:
def pipe_trigger(self) -> list:
return self.pipe_catalog["trigger"]

# [x] Migrate to PipelineCatalog.schedule_type
@property
def pipe_schedule_type(self) -> str:
_result: str = ""
Expand All @@ -1245,6 +1246,7 @@ def pipe_schedule_type(self) -> str:
_result += "|schedule" if _result else "schedule"
return _result

# [x] Migrate to PipelineCatalog.nodes
@property
def pipe_nodes(self) -> dict:
return self.pipe_catalog["nodes"]
Expand Down
3 changes: 3 additions & 0 deletions app/core/__legacy/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,7 @@ def check_pipe_schedule(
self.pipe_ctr_schedule: dict = self.pull_pipe_from_ctr_schedule()
return True

# [x] Migrate to `Pipeline.pull_schedule`
def pull_pipe_from_ctr_schedule(
self,
pipe_id: Optional[str] = None,
Expand All @@ -1952,6 +1953,7 @@ def pull_pipe_from_ctr_schedule(
all_flag=all_flag,
)

# [x] Migrate to `Pipeline.make_watermark`
def push_pipe_to_ctr_schedule(self, push_values: Optional[dict] = None):
"""Push data information to the Control Data Logging."""
_push_values: dict = merge_dicts(
Expand All @@ -1966,6 +1968,7 @@ def push_pipe_to_ctr_schedule(self, push_values: Optional[dict] = None):
)
return Control("ctr_task_schedule").push(push_values=_push_values)

# [x] Migrate to `Pipeline.push`
def update_pipe_to_ctr_schedule(self, update_values: Optional[dict] = None):
"""Update tacking information to the Control Task Schedule."""
_update_values: dict = merge_dicts(
Expand Down
21 changes: 21 additions & 0 deletions app/core/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,24 @@ def prepare_rtt_column(cls, value):
"rtt_column": UNDEFINED,
"active_flg": "N",
}


class ControlSchedule(BaseUpdatableModel):
pipeline_id: str
pipeline_name: str
pipeline_type: str
tracking: str
update_date: datetime
active_flg: bool
primary_id: int


SCH_DEFAULT: dict[str, Any] = {
"pipeline_id": UNDEFINED,
"pipeline_name": UNDEFINED,
"pipeline_type": UNDEFINED,
"tracking": UNDEFINED,
"update_date": "1990-01-01 00:00:00",
"active_flg": "false",
"primary_id": "-1",
}
157 changes: 113 additions & 44 deletions app/core/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@
TaskMode,
reduce_text,
)
from .schemas import WTM_DEFAULT, ControlWatermark
from .schemas import (
SCH_DEFAULT,
WTM_DEFAULT,
ControlSchedule,
ControlWatermark,
)
from .statements import (
ControlStatement,
FunctionStatement,
Expand Down Expand Up @@ -260,11 +265,19 @@ def exists(self) -> bool:
def create(self) -> None:
"""Push create statement to target database."""
if self.type not in ("func", "view", "mview"):
raise ValueError(
raise FuncRaiseError(
f"Function type {self.type!r} does not support create"
)
query_execute(self.statement(), parameters=True)

def drop(self) -> None:
if self.type == "func":
query_execute(self.statement_func_drop(), parameters=True)
else:
raise FuncRaiseError(
f"Function type {self.type!r} does not support drop"
)

def explain(self):
if self.type != "query":
raise FuncRaiseError(
Expand Down Expand Up @@ -361,7 +374,7 @@ def create(
cascade: bool = False,
) -> Self:
"""Execute create statement to target database."""
statements: list = [self.statement_create()]
statements: list[str] = [self.statement_create()]
if force_drop:
rows: int = self.count()
log: str = f"with {rows} row{get_plural(rows)} " if rows > 0 else ""
Expand All @@ -381,8 +394,6 @@ def create(
query_transaction(statements, parameters=True)
return self

def create_backup(self): ...

def create_partition(self): ...

def drop(
Expand Down Expand Up @@ -511,20 +522,22 @@ def __init__(self, **data):

def __validate_create(self) -> None:
_auto_create: bool = must_bool(
self.fwk_params.task_params.others.get("auto_create", "Y"),
self.fwk_params.task_params.others.get("auto_create", "N"),
force_raise=True,
)
_auto_init: bool = must_bool(
self.fwk_params.task_params.others.get("auto_init", "N"),
force_raise=True,
)
if not self.exists():
if not _auto_create:
raise TableNotFound(
"Please set `tbl_auto_create` be True or setup "
"this table with API."
"Please set `auto_create` be True or setup via API."
)
logger.info(
f"Auto create {self.name!r} in database because "
f"`auto_create` was set be True"
)
logger.info(f"Auto create {self.name!r} because set auto flag")
self.create()
if _auto_init:
self.init()
if (
self.watermark.table_name == UNDEFINED
and self.fwk_params.run_mode != TaskComponent.RECREATED
Expand Down Expand Up @@ -610,6 +623,10 @@ def delete_with_date(
del_date: str,
del_mode: Optional[str] = None,
) -> int:
if len(self.watermark.rtt_column) > 1:
raise CatalogArgumentError(
"Delete with date does not support for multi rtt columns."
)
_primary_key: list[str] = self.profile.primary_key
_primary_key_group: str = ",".join(
[str(_) for _ in range(1, len(_primary_key) + 1)]
Expand All @@ -629,7 +646,6 @@ def delete_with_date(
return query_select_row(
_stm,
parameters={
# TODO: rtt_column must be list of 1 column value
"ctr_rtt_col": self.watermark.rtt_column[0],
"ctr_rtt_value": self.watermark.rtt_value,
"del_operation": ("<" if del_mode == "rtt" else ">="),
Expand Down Expand Up @@ -665,20 +681,8 @@ def pull_max_data_date(self, default: bool = True) -> Optional[date]:
).get("max_date", _default_value)
)

def create(
self,
force_drop: bool = False,
cascade: bool = False,
) -> Self:
super().create(force_drop, cascade=cascade)
if (
must_bool(
self.fwk_params.task_params.others.get("auto_init", "N"),
force_raise=True,
)
and force_drop
and (init := self.initial)
):
def init(self) -> Self:
if init := self.initial:
_start_time: datetime = self.fwk_params.checkpoint()
rows: int = self.__execute(init, force_sql=True)
self.make_log(
Expand Down Expand Up @@ -1254,20 +1258,76 @@ def ingest(self) -> tuple[int, int]:
return _rs


class Pipeline(PipelineCatalog):
class BasePipeline(MapParameterService, PipelineCatalog):
"""Pipeline Service Model."""

def nodes(self): ...
watermark: ControlSchedule = Field(
default_factory=dict,
description="Pipeline watermark data from Control Task Schedule",
)

def log_push(self): ...
@validator("watermark", pre=True, always=True)
def __prepare_watermark(cls, value: DictKeyStr, values):
try:
wtm: DictKeyStr = SCH_DEFAULT | cls.pull_watermarks(
pipe_id=values["name"]
)
except DatabaseProcessError:
wtm = SCH_DEFAULT
return ControlSchedule.parse_obj(wtm | value)

def log_fetch(self): ...
def nodes(self): ...

def check_triggered(self): ...

def check_scheduled(self): ...

def push(self): ...
@classmethod
def pull_watermarks(
cls,
pipe_id: Optional[str] = None,
included_cols: Optional[list] = None,
all_flag: bool = False,
):
"""Pull tacking data from the Control Task Schedule."""
if pipe_id:
_pipe_id: str = pipe_id
elif all_flag:
_pipe_id: str = "*"
else:
raise ObjectBaseError(
"Pull Task Schedule should pass pipeline id or check all flag."
)
return Control("ctr_task_schedule").pull(
pm_filter={"pipeline_id": _pipe_id},
included=included_cols,
all_flag=all_flag,
)

def make_watermark(self, values: Optional[dict] = None):
return Control("ctr_task_schedule").create(
values=(
{
"pipeline_id": self.id,
"pipeline_name": self.name,
"pipeline_type": self.schedule,
"tracking": "SUCCESS",
"active_flg": "true",
}
| (values or {})
)
)

def push(self, values: Optional[dict] = None):
"""Update tacking information to the Control Task Schedule."""
return Control("ctr_task_schedule").push(
values=(
{"pipeline_id": self.id, "tracking": "SUCCESS"} | (values or {})
)
)


class Pipeline(BasePipeline): ...


class Task(BaseTask):
Expand Down Expand Up @@ -1417,20 +1477,29 @@ def __str__(self) -> str:
@classmethod
def params(cls, module: Optional[str] = None) -> DictKeyStr:
logger.debug("Loading params from `ctr_data_parameter` by Control ...")
_results: dict = {
value["param_name"]: (
ast.literal_eval(value["param_value"])
if value["param_type"] in {"list", "dict"}
else getattr(builtins, value["param_type"])(
value["param_value"]
_results: dict[str, Any] = {}
try:
_results: dict[str, Any] = {
value["param_name"]: (
ast.literal_eval(value["param_value"])
if value["param_type"] in ("list", "dict")
else getattr(builtins, value["param_type"])(
value["param_value"]
)
)
for value in query_select(
cls.statement_params(),
parameters=reduce_value_pairs(
{"module_type": (module or "*")}
),
)
}
except DatabaseProcessError:
logger.warning(
"Control Data Parameter does not exists, so, it will use empty."
)
for value in query_select(
cls.statement_params(),
parameters=reduce_value_pairs({"module_type": (module or "*")}),
)
}
# Calculate special parameters that logic was provided by vendor.

# NOTE: Calculate special parameters that logic was provided by vendor.
proportion_value: int = _results.get("proportion_value", 3)
proportion_inc_curr_m: str = _results.get(
"proportion_inc_current_month_flag", "N"
Expand Down
Loading

0 comments on commit ee4346e

Please sign in to comment.