Skip to content

Commit

Permalink
🎯 feat: add test task for create custome task logging to table.
Browse files Browse the repository at this point in the history
  • Loading branch information
korawica committed May 8, 2024
1 parent 367b3bc commit f69fef0
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 64 deletions.
34 changes: 12 additions & 22 deletions app/controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ def push_func_setup(task: Optional[Task] = None) -> None:
task: Task = task or Task.make(module="function_setup")
for idx, _func_prop in enumerate(registers.functions, start=1):
_func: Action = Action.parse_name(fullname=_func_prop["name"])
logger.info(
f"START {idx:02d}: {_func.name} "
f'{"~" * (30 - len(_func.name) + 31)}'
)

logger.info(f"START {idx:02d}: {f'{_func.name} ':~<30}")
if not _func.exists():
_func.create()
logger.info(
Expand All @@ -93,10 +89,7 @@ def push_ctr_setup(
):
status: Status = Status.SUCCESS
_node = Node.parse_name(fullname=_ctr_prop["name"])
logger.info(
f"START {idx:02d}: {_node.name} {'~' * (30 - len(_node.name) + 31)}"
)

logger.info(f"START {idx:02d}: {f'{_node.name} ':~<30}")
if not _node.exists():
if _node.name in (
"ctr_data_logging",
Expand Down Expand Up @@ -288,7 +281,7 @@ def push_load_file_to_db(
target: str,
truncate: bool = False,
compress: Optional[str] = None,
):
) -> int:
"""Push load csv file to target table with short name.
Examples:
Expand All @@ -309,15 +302,12 @@ def push_testing() -> None:
Schema().create()

logger.info("Start Testing ...")

# for _, _ctr_prop in enumerate(
# registers.control_frameworks,
# start=1,
# ):
# node = Node.parse_name(fullname=_ctr_prop["name"])
# if not node.exists():
# node.create()

from app.core.services import Control

print(Control.params())
task: Task = Task.make(module="push_testing")
print(task)
task.start()
for _, _ctr_prop in enumerate(registers.control_frameworks, start=1):
node: Node = Node.parse_name(fullname=_ctr_prop["name"])
if not node.exists():
node.create()
break
task.finish()
6 changes: 5 additions & 1 deletion app/core/__legacy/convertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ def reduce_in_value(value: Union[str, int, list]) -> str:
return f"({reduce_value(value)})"


def reduce_value(value: Union[str, int]) -> str:
# [x] Migrate to ./statements file
def reduce_value(value: Optional[Union[str, int]] = None) -> str:
if value is None:
return "null"
return value if value in {"null", "true", "false", "*"} else f"'{value}'"


# [x] Migrate to ./statements file
def reduce_value_pairs(value_pairs: dict) -> dict:
return {
col: col if value == "*" else reduce_value(value)
Expand Down
16 changes: 12 additions & 4 deletions app/core/__legacy/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def check_schema_exists(schema_name: str) -> bool:
)


# [x] Migrate to modern style by `Schema` Model
def check_ai_exists() -> bool:
return check_schema_exists(schema_name=env.get("AI_SCHEMA", "ai"))

Expand Down Expand Up @@ -240,6 +241,7 @@ class Control:

__slots__ = ("ctr", "ctr_cols", "ctr_cols_exc_pk", "ctr_pk")

# [x] Migrate to modern style by `Control` service
@classmethod
def parameters(cls, module: Optional[str] = None) -> dict:
"""Get all parameters with `module` argument from `ctr_data_parameter`
Expand Down Expand Up @@ -426,11 +428,17 @@ def pull(
},
)

def push(self, push_values: dict, condition: Optional[str] = None) -> int:
def push(
self,
push_values: dict,
condition: Optional[str] = None,
) -> int:
"""Push New data to the Control Framework tables, such as.
- `ctr_data_logging`
- `ctr_task_process`
- `ctr_data_logging`
- `ctr_task_process`
:return: Return a number of row that insert to target table.
"""
_ctr_columns = filter(
lambda _col: _col not in {"primary_id"}, self.ctr_cols
Expand Down Expand Up @@ -1736,7 +1744,7 @@ def check_run_mode(obj: str, run_mode: Optional[str] = None):
)


# [x] Migrate to Action services
# [x] Migrate to `Action` services
class Action(FuncProcess):
"""Action object for control process of function object."""

Expand Down
Empty file.
5 changes: 0 additions & 5 deletions app/core/__legacy/tests/test_base.py

This file was deleted.

Empty file.
4 changes: 3 additions & 1 deletion app/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Licensed under the MIT License. See LICENSE in the project root for
# license information.
# ------------------------------------------------------------------------------
from __future__ import annotations

import fnmatch
import importlib
import operator
Expand Down Expand Up @@ -232,7 +234,7 @@ def from_shortname(
prefix: Optional[str],
folder: str,
prefix_file: str,
) -> "LoadCatalog":
) -> LoadCatalog:
return cls(name, prefix, folder, prefix_file, shortname=True)

def __init__(
Expand Down
10 changes: 7 additions & 3 deletions app/core/connections/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@


def query_select_check(
statement: str, parameters: Optional[dict] = None
statement: str,
parameters: Optional[dict] = None,
) -> bool:
"""Enhance query function to get `check_exists` value from result."""
return eval(
query_select_one(statement, parameters=parameters)["check_exists"]
)


def query_select_row(statement: str, parameters: Optional[dict] = None) -> int:
def query_select_row(
statement: str,
parameters: Optional[dict] = None,
) -> int:
"""Enhance query function to get `row_number` value from result."""
if any(
_ in statement
_ in statement.lower()
for _ in {
"select count(*) as row_number from ",
"func_count_if_exists",
Expand Down
2 changes: 2 additions & 0 deletions app/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Licensed under the MIT License. See LICENSE in the project root for
# license information.
# ------------------------------------------------------------------------------
from __future__ import annotations

from dataclasses import (
dataclass,
field,
Expand Down
4 changes: 4 additions & 0 deletions app/core/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Licensed under the MIT License. See LICENSE in the project root for
# license information.
# ------------------------------------------------------------------------------
from __future__ import annotations

import re
from typing import (
Optional,
Expand Down Expand Up @@ -39,6 +41,8 @@ def reduce_stm(stm: str, add_row_number: bool = False) -> str:


def reduce_value(value: Union[str, int]) -> str:
if isinstance(value, list):
return f"({', '.join([reduce_value(_) for _ in value])})"
return value if value in {"null", "true", "false", "*"} else f"'{value}'"


Expand Down
30 changes: 2 additions & 28 deletions app/core/utils/reusables.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See LICENSE in the project root for
# license information.
# ------------------------------------------------------------------------------
from __future__ import annotations

import ast
import hashlib
Expand Down Expand Up @@ -41,7 +42,6 @@
)


# utility function -------------------------------------------------------------
def split_str(strings, sep: str = r"\s+"):
"""
warning: does not yet work if sep is a lookahead like `(?=b)`
Expand Down Expand Up @@ -70,7 +70,6 @@ def split_str(strings, sep: str = r"\s+"):
yield match.group(1)


# utility function -------------------------------------------------------------
def isplit(source, sep=None, regex=False):
"""Generator version of str.split() :param source: source string (unicode
or bytes) :param sep: separator to split on. :param regex: if True, will
Expand Down Expand Up @@ -111,7 +110,6 @@ def isplit(source, sep=None, regex=False):
start = idx + sep_size


# utility function -------------------------------------------------------------
def split_iterable(iterable, chunk_size=None, generator_flag: bool = True):
"""
Split an iterable into mini batch with batch length of batch_number
Expand Down Expand Up @@ -147,14 +145,12 @@ def split_iterable(iterable, chunk_size=None, generator_flag: bool = True):
return _chunks


# utility function -------------------------------------------------------------
def chunks(dataframe, n):
"""Yield successive n-sized chunks from dataframe."""
for i in range(0, len(dataframe), n):
yield dataframe.iloc[i : i + n]


# utility function -------------------------------------------------------------
def rows(f, chunk_size=1024, sep="|"):
"""
Read a file where the row separator is '|' lazily
Expand All @@ -173,7 +169,6 @@ def rows(f, chunk_size=1024, sep="|"):
yield row


# utility function -------------------------------------------------------------
def merge_dicts(*dict_args) -> dict:
"""Given any number of dictionaries, shallow copy and merge into a new
dict, precedence goes to key-value pairs in latter dictionaries.
Expand All @@ -188,7 +183,6 @@ def merge_dicts(*dict_args) -> dict:
return result


# utility function -------------------------------------------------------------
def merge_lists(*list_args) -> list:
"""
usage:
Expand All @@ -201,23 +195,20 @@ def merge_lists(*list_args) -> list:
return result


# utility function -------------------------------------------------------------
def hash_string(input_value: str, num_length: int = 8) -> str:
"""Hash str input to number with SHA256 algorithm."""
return str(
int(hashlib.sha256(input_value.encode("utf-8")).hexdigest(), 16)
)[-num_length:]


# utility function -------------------------------------------------------------
def random_sting(num_length: int = 8) -> str:
"""Random string from uppercase ASCII and number 0-9."""
return "".join(
random.choices(string.ascii_uppercase + string.digits, k=num_length)
)


# utility function -------------------------------------------------------------
def path_join(full_path: AnyStr, full_join_path: str) -> AnyStr:
"""Join path with multi pardir value if set `full_join_path` be
'../../<path>'."""
Expand All @@ -232,7 +223,6 @@ def path_join(full_path: AnyStr, full_join_path: str) -> AnyStr:
return _abspath


# utility function -------------------------------------------------------------
def convert_str_list(str_list: str) -> list:
"""Get list of run_date from list string of run_date.
Expand All @@ -250,7 +240,6 @@ def convert_str_list(str_list: str) -> list:
)


# utility function -------------------------------------------------------------
def convert_str_dict(str_dict: str) -> dict:
"""Get list of run_date from list string of run_date.
Expand All @@ -268,7 +257,6 @@ def convert_str_dict(str_dict: str) -> dict:
)


# utility function -------------------------------------------------------------
def convert_str_bool(str_bool: str, force_raise: bool = False) -> bool:
"""Get boolean of input string."""
if str_bool.lower() in {"yes", "true", "t", "1", "y", "1.0"}:
Expand All @@ -280,7 +268,6 @@ def convert_str_bool(str_bool: str, force_raise: bool = False) -> bool:
return False


# utility function -------------------------------------------------------------
def sort_by_priority_list(values: Iterable, priority: list) -> list:
"""Sorts an iterable according to a list of priority items.
Expand All @@ -291,24 +278,17 @@ def sort_by_priority_list(values: Iterable, priority: list) -> list:
>> sort_by_priority_list(values=set([1,2,3]), priority=[2,3])
[2, 3, 1]
"""
# priority_dict = {k: i for i, k in enumerate(priority)}
#
# def priority_getter(value):
# return priority_dict.get(value, len(values))
#
# return sorted(values, key=priority_getter)
priority_dict = defaultdict(
lambda: len(priority),
zip(
priority,
range(len(priority)),
),
)
priority_getter = priority_dict.__getitem__ # dict.get(key)
priority_getter = priority_dict.__getitem__
return sorted(values, key=priority_getter)


# utility function -------------------------------------------------------------
def only_one(
check_list: list, match_list: list, default: bool = True
) -> Optional:
Expand All @@ -331,21 +311,18 @@ def only_one(
)


# utility function -------------------------------------------------------------
def must_list(value: Optional[Union[str, list]] = None) -> list:
if value:
return convert_str_list(value) if isinstance(value, str) else value
return []


# utility function -------------------------------------------------------------
def must_dict(value: Optional[Union[str, dict]] = None) -> dict:
if value:
return convert_str_dict(value) if isinstance(value, str) else value
return {}


# utility function -------------------------------------------------------------
def must_bool(
value: Optional[Union[str, int, bool]] = None, force_raise: bool = False
) -> bool:
Expand All @@ -363,7 +340,6 @@ def must_bool(
return False


# utility function -------------------------------------------------------------
def to_snake_case(value: str):
"""
Usage
Expand All @@ -374,12 +350,10 @@ def to_snake_case(value: str):
return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name).lower()


# utility function -------------------------------------------------------------
def to_pascal_case(value: str, joined: str = ""):
return joined.join(word.title() for word in value.split("_"))


# utility function -------------------------------------------------------------
def to_camel_case(value: str):
return "".join(
word.title() if index_word > 0 else word
Expand Down

0 comments on commit f69fef0

Please sign in to comment.