Skip to content

Commit

Permalink
🎯 feat: migrate legacy node to service without diff check.
Browse files Browse the repository at this point in the history
  • Loading branch information
korawica committed May 11, 2024
1 parent b22c7ef commit 4b7b108
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
2 changes: 1 addition & 1 deletion app/core/__legacy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ def __repr__(self):
def __str__(self):
return self.pipe_name

# [x] Migrate to modern style
# [x] Migrate to modern style `PiplineFrontend`
@property
def catalog(self) -> dict:
return {
Expand Down
14 changes: 10 additions & 4 deletions app/core/__legacy/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def get_time_checkpoint(
return get_run_date(date_type=(date_type or "date_time"))


# [x] Migrate to Node.split_choose
def split_choose(choose: Union[str, list]) -> tuple[list, list]:
processes: list = (
list(set(choose)) if isinstance(choose, list) else [choose]
Expand Down Expand Up @@ -679,6 +680,7 @@ def pull_tbl_retention_date(
date_type="date",
)

# [x] Migrate to modern on service `Node.profile.features`
def pull_tbl_columns_datatype(self) -> dict:
"""Pull name and data type of all columns of table in database."""
return {
Expand Down Expand Up @@ -907,13 +909,15 @@ def push_tbl_diff_merge(self, merge_cols: dict):
# 'mapping_select': ', '.join(mapping_col_select)
# })

# [x] Migrate to `NodeManage.vacuum`
def push_tbl_vacuum(self, option: Optional[list] = None) -> None:
_option: str = ", ".join(option) if option else "full"
return query_execute(
reduce_stm(params.ps_stm.push_vacuum),
parameters={"table_name": self.tbl_name, "option": _option},
)

# [x] Migrate to modern on service `Node.delete_with_condition`
def push_tbl_del_with_condition(self, condition: str) -> int:
return query_select_row(
reduce_stm(
Expand Down Expand Up @@ -1470,16 +1474,16 @@ def __init__(
tbl_auto_init=auto_init,
verbose=verbose,
)

# [x] Migrate to Node.split_choose
self.node_tbl_ps_included: list = (
[_ for _ in self.tbl_process.keys() if _ in _node_tbl_ps_included]
if _node_tbl_ps_included
else list(self.tbl_process.keys())
)

self.node_start_datetime: dt.datetime = get_time_checkpoint()
self.node_tbl_run_check: bool = (
self.tbl_ctr_run_count_now < self.tbl_ctr_run_count_max
or self.tbl_ctr_run_count_max == 0
)

# [x] Migrate to Node.__validate_quota
if (
self.tbl_run_date < self.tbl_ctr_run_date
Expand Down Expand Up @@ -1639,10 +1643,12 @@ def process_start(self) -> dict:
)
return _row_record

# [x] Migrate to modern on service `Node.ext_params`
@property
def retention_mode(self) -> str:
return self.node_tbl_params.get("data_retention_mode", "data_date")

# [x] Migrate to modern on service `Node.retention_date`
@property
def retention_date(self) -> dt.date:
return self.pull_tbl_retention_date(rtt_mode=self.retention_mode)
Expand Down
49 changes: 42 additions & 7 deletions app/core/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
split_iterable,
)
from .validators import (
Choose,
FrameworkParameter,
MapParameter,
ReleaseDate,
Expand Down Expand Up @@ -497,7 +498,7 @@ def count(self, condition: Optional[Union[str, list]] = None) -> int:

class Node(BaseNode):

choose: list[str] = Field(default_factory=list)
choose: list[str] = Field(default_factory=list, description="Node choose")
watermark: ControlWatermark = Field(
default_factory=dict,
description="Node watermark data from Control Pipeline",
Expand Down Expand Up @@ -550,6 +551,24 @@ def __validate_quota(self) -> None:
f"running date: {self.watermark.run_date:'%Y-%m-%d'}"
)

@property
def split_choose(self) -> Choose:
_process: dict[str, list[str]] = {"reject": [], "filter": []}
for process in self.choose:
if process.startswith("!"):
_process["reject"].append(process.split("!")[-1])
else:
_process["filter"].append(process)
_filter: list[str] = _process["filter"]
return Choose(
included=(
[_ for _ in self.process.keys() if _ in _filter]
if _filter
else list(self.process.keys())
),
excluded=_process["reject"],
)

@validator("watermark", pre=True, always=True)
def __prepare_watermark(cls, value: DictKeyStr, values):
try:
Expand All @@ -560,6 +579,10 @@ def __prepare_watermark(cls, value: DictKeyStr, values):
wtm = WTM_DEFAULT
return ControlWatermark.parse_obj(wtm | value)

@validator("choose", pre=True, always=True)
def __prepare_choose(cls, value: Union[str, list[str]]) -> list[str]:
return list(set(value)) if isinstance(value, list) else [value]

def watermark_refresh(self):
logger.debug("Add more external parameters ...")
self.__dict__["watermark"] = ControlWatermark.parse_obj(
Expand All @@ -574,6 +597,14 @@ def process_date(self) -> date:
date_type="date",
)

def delete_with_condition(self, condition: str) -> int:
return query_select_row(
reduce_stm(
PARAMS.ps_stm.push_del_with_condition, add_row_number=False
),
parameters={"table_name": self.name, "condition": condition},
)

def delete_with_date(
self,
del_date: str,
Expand Down Expand Up @@ -838,7 +869,7 @@ def _prepare_before_rerun(self, sla: int) -> tuple[date, date]:
)
return _run_date, _data_date

def process_run_count(self, row_record: dict):
def process_run_count(self, row_record: dict) -> int:
return (
int(float(self.watermark.run_count_now)) + 1
if (
Expand All @@ -848,9 +879,9 @@ def process_run_count(self, row_record: dict):
else 0
)

def process_count(self):
_excluded: list = self.node_tbl_ps_excluded
if _included := self.node_tbl_ps_included:
def process_count(self) -> int:
_excluded: list = self.split_choose.excluded
if _included := self.split_choose.included:
return len(_included) - len(
set(_included).intersection(set(_excluded))
)
Expand All @@ -872,8 +903,8 @@ def process_start(self) -> dict[int, int]:
)
_run_date, _data_date = self._prepare_before_rerun(sla=_ps_sla)
_row_record: dict[int, int] = self.execute(
included=self.node_tbl_ps_included,
excluded=self.node_tbl_ps_excluded,
included=self.split_choose.included,
excluded=self.split_choose.excluded,
act_type=self.fwk_params.run_mode,
params=(
_additional
Expand Down Expand Up @@ -1056,6 +1087,10 @@ def retention(self):
)
)

def vacuum(self, options: Optional[list[str]] = None) -> None:
_option: str = ", ".join(options) if options else "full"
return query_execute(self.statement_vacuum(_option), parameters=True)


class NodeLocal(Node):
"""Node for Local File loading."""
Expand Down
5 changes: 5 additions & 0 deletions app/core/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ def statement_transfer(self, name: Optional[str] = None) -> str:
f"FROM {{database_name}}.{{ai_schema_name}}.{self.name})"
)

def statement_vacuum(self, option: str) -> str:
return reduce_stm(
f"vacuum {option} {{database_name}}.{{ai_schema_name}}.{self.name}"
)

def conflict_set(
self,
excluded: Optional[list] = None,
Expand Down
6 changes: 6 additions & 0 deletions app/core/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"ReleaseDate",
"Task",
"MapParameter",
"Choose",
"FrameworkParameter",
)

Expand Down Expand Up @@ -1509,6 +1510,11 @@ def checkpoint(date_type: Optional[str] = None):
return get_run_date(date_type=(date_type or "date_time"))


class Choose(BaseUpdatableModel):
included: list[str]
excluded: list[str]


class MapParameter(BaseUpdatableModel):
fwk_params: FrameworkParameter = Field(
default_factory=dict,
Expand Down

0 comments on commit 4b7b108

Please sign in to comment.