diff --git a/app/core/__legacy/base.py b/app/core/__legacy/base.py index 685168c..7ceae7b 100644 --- a/app/core/__legacy/base.py +++ b/app/core/__legacy/base.py @@ -1200,7 +1200,7 @@ def __repr__(self): def __str__(self): return self.pipe_name - # [x] Migrate to modern style + # [x] Migrate to modern style `PiplineFrontend` @property def catalog(self) -> dict: return { diff --git a/app/core/__legacy/objects.py b/app/core/__legacy/objects.py index e1155be..b3004df 100644 --- a/app/core/__legacy/objects.py +++ b/app/core/__legacy/objects.py @@ -124,6 +124,7 @@ def get_time_checkpoint( return get_run_date(date_type=(date_type or "date_time")) +# [x] Migrate to Node.split_choose def split_choose(choose: Union[str, list]) -> tuple[list, list]: processes: list = ( list(set(choose)) if isinstance(choose, list) else [choose] @@ -679,6 +680,7 @@ def pull_tbl_retention_date( date_type="date", ) + # [x] Migrate to modern on service `Node.profile.features` def pull_tbl_columns_datatype(self) -> dict: """Pull name and data type of all columns of table in database.""" return { @@ -907,6 +909,7 @@ def push_tbl_diff_merge(self, merge_cols: dict): # 'mapping_select': ', '.join(mapping_col_select) # }) + # [x] Migrate to `NodeManage.vacuum` def push_tbl_vacuum(self, option: Optional[list] = None) -> None: _option: str = ", ".join(option) if option else "full" return query_execute( @@ -914,6 +917,7 @@ def push_tbl_vacuum(self, option: Optional[list] = None) -> None: parameters={"table_name": self.tbl_name, "option": _option}, ) + # [x] Migrate to modern on service `Node.delete_with_condition` def push_tbl_del_with_condition(self, condition: str) -> int: return query_select_row( reduce_stm( @@ -1470,16 +1474,16 @@ def __init__( tbl_auto_init=auto_init, verbose=verbose, ) + + # [x] Migrate to Node.split_choose self.node_tbl_ps_included: list = ( [_ for _ in self.tbl_process.keys() if _ in _node_tbl_ps_included] if _node_tbl_ps_included else list(self.tbl_process.keys()) ) + self.node_start_datetime: dt.datetime = get_time_checkpoint() - self.node_tbl_run_check: bool = ( - self.tbl_ctr_run_count_now < self.tbl_ctr_run_count_max - or self.tbl_ctr_run_count_max == 0 - ) + # [x] Migrate to Node.__validate_quota if ( self.tbl_run_date < self.tbl_ctr_run_date @@ -1639,10 +1643,12 @@ def process_start(self) -> dict: ) return _row_record + # [x] Migrate to modern on service `Node.ext_params` @property def retention_mode(self) -> str: return self.node_tbl_params.get("data_retention_mode", "data_date") + # [x] Migrate to modern on service `Node.retention_date` @property def retention_date(self) -> dt.date: return self.pull_tbl_retention_date(rtt_mode=self.retention_mode) diff --git a/app/core/services.py b/app/core/services.py index 632ec98..5318254 100644 --- a/app/core/services.py +++ b/app/core/services.py @@ -87,6 +87,7 @@ split_iterable, ) from .validators import ( + Choose, FrameworkParameter, MapParameter, ReleaseDate, @@ -497,7 +498,7 @@ def count(self, condition: Optional[Union[str, list]] = None) -> int: class Node(BaseNode): - choose: list[str] = Field(default_factory=list) + choose: list[str] = Field(default_factory=list, description="Node choose") watermark: ControlWatermark = Field( default_factory=dict, description="Node watermark data from Control Pipeline", @@ -550,6 +551,24 @@ def __validate_quota(self) -> None: f"running date: {self.watermark.run_date:'%Y-%m-%d'}" ) + @property + def split_choose(self) -> Choose: + _process: dict[str, list[str]] = {"reject": [], "filter": []} + for process in self.choose: + if process.startswith("!"): + _process["reject"].append(process.split("!")[-1]) + else: + _process["filter"].append(process) + _filter: list[str] = _process["filter"] + return Choose( + included=( + [_ for _ in self.process.keys() if _ in _filter] + if _filter + else list(self.process.keys()) + ), + excluded=_process["reject"], + ) + @validator("watermark", pre=True, always=True) def __prepare_watermark(cls, value: DictKeyStr, values): try: @@ -560,6 +579,10 @@ def __prepare_watermark(cls, value: DictKeyStr, values): wtm = WTM_DEFAULT return ControlWatermark.parse_obj(wtm | value) + @validator("choose", pre=True, always=True) + def __prepare_choose(cls, value: Union[str, list[str]]) -> list[str]: + return list(set(value)) if isinstance(value, list) else [value] + def watermark_refresh(self): logger.debug("Add more external parameters ...") self.__dict__["watermark"] = ControlWatermark.parse_obj( @@ -574,6 +597,14 @@ def process_date(self) -> date: date_type="date", ) + def delete_with_condition(self, condition: str) -> int: + return query_select_row( + reduce_stm( + PARAMS.ps_stm.push_del_with_condition, add_row_number=False + ), + parameters={"table_name": self.name, "condition": condition}, + ) + def delete_with_date( self, del_date: str, @@ -838,7 +869,7 @@ def _prepare_before_rerun(self, sla: int) -> tuple[date, date]: ) return _run_date, _data_date - def process_run_count(self, row_record: dict): + def process_run_count(self, row_record: dict) -> int: return ( int(float(self.watermark.run_count_now)) + 1 if ( @@ -848,9 +879,9 @@ def process_run_count(self, row_record: dict): else 0 ) - def process_count(self): - _excluded: list = self.node_tbl_ps_excluded - if _included := self.node_tbl_ps_included: + def process_count(self) -> int: + _excluded: list = self.split_choose.excluded + if _included := self.split_choose.included: return len(_included) - len( set(_included).intersection(set(_excluded)) ) @@ -872,8 +903,8 @@ def process_start(self) -> dict[int, int]: ) _run_date, _data_date = self._prepare_before_rerun(sla=_ps_sla) _row_record: dict[int, int] = self.execute( - included=self.node_tbl_ps_included, - excluded=self.node_tbl_ps_excluded, + included=self.split_choose.included, + excluded=self.split_choose.excluded, act_type=self.fwk_params.run_mode, params=( _additional @@ -1056,6 +1087,10 @@ def retention(self): ) ) + def vacuum(self, options: Optional[list[str]] = None) -> None: + _option: str = ", ".join(options) if options else "full" + return query_execute(self.statement_vacuum(_option), parameters=True) + class NodeLocal(Node): """Node for Local File loading.""" diff --git a/app/core/statements.py b/app/core/statements.py index 89ca554..f43c5e2 100644 --- a/app/core/statements.py +++ b/app/core/statements.py @@ -341,6 +341,11 @@ def statement_transfer(self, name: Optional[str] = None) -> str: f"FROM {{database_name}}.{{ai_schema_name}}.{self.name})" ) + def statement_vacuum(self, option: str) -> str: + return reduce_stm( + f"vacuum {option} {{database_name}}.{{ai_schema_name}}.{self.name}" + ) + def conflict_set( self, excluded: Optional[list] = None, diff --git a/app/core/validators.py b/app/core/validators.py index 8b644ab..0b5d88c 100644 --- a/app/core/validators.py +++ b/app/core/validators.py @@ -75,6 +75,7 @@ "ReleaseDate", "Task", "MapParameter", + "Choose", "FrameworkParameter", ) @@ -1509,6 +1510,11 @@ def checkpoint(date_type: Optional[str] = None): return get_run_date(date_type=(date_type or "date_time")) +class Choose(BaseUpdatableModel): + included: list[str] + excluded: list[str] + + class MapParameter(BaseUpdatableModel): fwk_params: FrameworkParameter = Field( default_factory=dict,