diff --git a/src/pydolphinscheduler/cli/commands.py b/src/pydolphinscheduler/cli/commands.py index 6a09326..eeb3521 100644 --- a/src/pydolphinscheduler/cli/commands.py +++ b/src/pydolphinscheduler/cli/commands.py @@ -27,6 +27,7 @@ set_single_config, ) from pydolphinscheduler.core.yaml_workflow import create_workflow +from pydolphinscheduler.cli.tenants import tenants version_option_val = ["major", "minor", "micro"] @@ -104,3 +105,6 @@ def config(getter, setter, init) -> None: def yaml(yaml_file) -> None: """Create workflow using YAML file.""" create_workflow(yaml_file) + + +cli.add_command(tenants) diff --git a/src/pydolphinscheduler/cli/tenants.py b/src/pydolphinscheduler/cli/tenants.py new file mode 100644 index 0000000..08b9f4b --- /dev/null +++ b/src/pydolphinscheduler/cli/tenants.py @@ -0,0 +1,64 @@ +import click + +from pydolphinscheduler import configuration +from pydolphinscheduler.models.tenant import Tenant + + +@click.help_option() +def tenants() -> click.group: + """Users subcommand group.""" + + +@click.group() +def tenants() -> click.group: + """Users subcommand group.""" + + +@tenants.command() +@click.option( + "-n", "--name", "name", + required=True, + type=str, +) +@click.option( + "-q", "--queue-name", "queue_name", + required=True, + type=str, +) +@click.option( + "-d", "--description", "description", + required=True, + type=str, +) +def create(name, queue_name, description): + tenant = Tenant.get(name) + if tenant: + click.echo(f"Tenant with name {name} already exists.", err=True) + new_tenant = Tenant.create(name, queue_name, description) + click.echo(f"Tenant {new_tenant.name} had been created.") + + +@tenants.command() +@click.option( + "-n", "--name", "name", + required=True, + type=str, +) +def delete(name): + tenant = Tenant.delete(name) + if not tenant: + click.echo(f"Tenant with name {name} not exists.", err=True) + click.echo(f"Tenant: {tenant}.") + + +@tenants.command() +@click.option( + "-n", "--name", "name", + required=True, + type=str, +) +def get(name): + tenant = Tenant.get(name) + if not tenant: + click.echo(f"Tenant with name {name} not exists.", err=True) + click.echo(f"Tenant: {tenant}.") diff --git a/src/pydolphinscheduler/cli/users.py b/src/pydolphinscheduler/cli/users.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pydolphinscheduler/core/workflow.py b/src/pydolphinscheduler/core/workflow.py index 02906fb..bda0448 100644 --- a/src/pydolphinscheduler/core/workflow.py +++ b/src/pydolphinscheduler/core/workflow.py @@ -27,7 +27,7 @@ from pydolphinscheduler.core.resource_plugin import ResourcePlugin from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException from pydolphinscheduler.java_gateway import gateway -from pydolphinscheduler.models import Base, Project, Tenant, User +from pydolphinscheduler.models import Base, Project, User from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule @@ -96,7 +96,6 @@ class Workflow(Base): "name", "description", "_project", - "_tenant", "worker_group", "warning_type", "warning_group_id", @@ -120,7 +119,6 @@ def __init__( timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE, user: Optional[str] = configuration.WORKFLOW_USER, project: Optional[str] = configuration.WORKFLOW_PROJECT, - tenant: Optional[str] = configuration.WORKFLOW_TENANT, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE, warning_group_id: Optional[int] = 0, @@ -130,6 +128,8 @@ def __init__( param: Optional[Dict] = None, resource_plugin: Optional[ResourcePlugin] = None, resource_list: Optional[List[Resource]] = None, + *args, + **kwargs, ): super().__init__(name, description) self.schedule = schedule @@ -138,7 +138,6 @@ def __init__( self.timezone = timezone self._user = user self._project = project - self._tenant = tenant self.worker_group = worker_group self.warning_type = warning_type if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"): @@ -176,16 +175,6 @@ def __enter__(self) -> "Workflow": def __exit__(self, exc_type, exc_val, exc_tb) -> None: WorkflowContext.delete() - @property - def tenant(self) -> Tenant: - """Get attribute tenant.""" - return Tenant(self._tenant) - - @tenant.setter - def tenant(self, tenant: Tenant) -> None: - """Set attribute tenant.""" - self._tenant = tenant.name - @property def project(self) -> Project: """Get attribute project.""" @@ -202,7 +191,7 @@ def user(self) -> User: For now we just get from python models but not from java gateway models, so it may not correct. """ - return User(name=self._user, tenant=self._tenant) + return User(name=self._user) @staticmethod def _parse_datetime(val: Any) -> Any: @@ -359,7 +348,7 @@ def get_tasks_by_name(self, name: str) -> Set["Task"]: # noqa: F821 """Get tasks object by given name, if will return all tasks with this name.""" find = set() for task in self.tasks.values(): - if task.name == name: + if task.name_chd == name: find.add(task) return find @@ -436,7 +425,6 @@ def submit(self) -> int: self.execution_type, self.timeout, self.worker_group, - self._tenant, self.release_state, # TODO add serialization function json.dumps(self.task_relation_json), diff --git a/src/pydolphinscheduler/decorators.py b/src/pydolphinscheduler/decorators.py new file mode 100644 index 0000000..632aa8b --- /dev/null +++ b/src/pydolphinscheduler/decorators.py @@ -0,0 +1,23 @@ +from functools import wraps +from typing import Callable + +from pydolphinscheduler import configuration + + +def provide_user(func: Callable) -> Callable: + """Decorator provides user from configuration when user is not provided. + + If you want to reuse a session or run the function as part of a + database transaction, you pass it to the function, if not this wrapper + will create one and close it for you. + """ + session_args_idx = find_session_idx(func) + + @wraps(func) + def wrapper(*args, **kwargs) -> Callable: + if "user" in kwargs or session_args_idx < len(args): + return func(*args, **kwargs) + else: + return func(*args, user=configuration.USER_NAME, **kwargs) + + return wrapper \ No newline at end of file diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/tutorial.py index cb6d47f..fc6aa31 100644 --- a/src/pydolphinscheduler/examples/tutorial.py +++ b/src/pydolphinscheduler/examples/tutorial.py @@ -47,22 +47,24 @@ start_time="2021-01-01", tenant="tenant_exists", ) as pd: + +wf = Workflow() # [end workflow_declare] # [start task_declare] - task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler") - task_child_one = Shell(name="task_child_one", command="echo 'child one'") - task_child_two = Shell(name="task_child_two", command="echo 'child two'") - task_union = Shell(name="task_union", command="echo union") - # [end task_declare] +task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler", workflow=wf) +task_child_one = Shell(name="task_child_one", command="echo 'child one'") +task_child_two = Shell(name="task_child_two", command="echo 'child two'") +task_union = Shell(name="task_union", command="echo union") +# [end task_declare] - # [start task_relation_declare] - task_group = [task_child_one, task_child_two] - task_parent.set_downstream(task_group) +# [start task_relation_declare] +task_group = [task_child_one, task_child_two] +task_parent.set_downstream(task_group) - task_union << task_group - # [end task_relation_declare] +task_union << task_group +# [end task_relation_declare] - # [start submit_or_run] - pd.run() +# [start submit_or_run] +pd.submit() # [end submit_or_run] # [end tutorial] diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py index 9ec039a..74dd70d 100644 --- a/src/pydolphinscheduler/java_gateway.py +++ b/src/pydolphinscheduler/java_gateway.py @@ -23,7 +23,7 @@ from typing import Any, Optional from py4j.java_collections import JavaMap -from py4j.java_gateway import GatewayParameters, JavaGateway +from py4j.java_gateway import GatewayParameters, JavaGateway, JavaObject from py4j.protocol import Py4JError from pydolphinscheduler import __version__, configuration @@ -160,11 +160,11 @@ def delete_project(self, user: str, code: int): return self.gateway.entry_point.deleteProject(user, code) def create_tenant( - self, tenant_name: str, queue_name: str, description: Optional[str] = None - ): + self, tenant_code: str, queue_name: str, description: Optional[str] = None + ) -> JavaObject: """Create tenant through java gateway.""" return self.gateway.entry_point.createTenant( - tenant_name, description, queue_name + tenant_code, description, queue_name ) def query_tenant(self, tenant_code: str): @@ -177,19 +177,23 @@ def grant_tenant_to_user(self, user_name: str, tenant_code: str): def update_tenant( self, - user: str, - tenant_id: int, - code: str, - queue_id: int, - description: Optional[str] = None, - ): - """Update tenant through java gateway.""" + id_: int, + tenant_code: str, + queue_id: str, + description: str, + user: Optional[str] = configuration.USER_NAME, + ) -> JavaObject: + """Update tenant by tenant_code through java gateway.""" return self.gateway.entry_point.updateTenant( - user, tenant_id, code, queue_id, description + user, id_, tenant_code, queue_id, description ) - def delete_tenant(self, user: str, tenant_id: int): - """Delete tenant through java gateway.""" + def delete_tenant(self, tenant_code: str, user: Optional[str] = configuration.USER_NAME) -> None: + """Delete tenant by tenant_code through java gateway. + + :tenant_code: tenant code for identification resource + :user: username to delete tenant + """ return self.gateway.entry_point.deleteTenantById(user, tenant_id) def create_user( @@ -198,13 +202,13 @@ def create_user( password: str, email: str, phone: str, - tenant: str, + tenant_code: str, queue: str, - status: int, + state: int, ): """Create user through java gateway.""" return self.gateway.entry_point.createUser( - name, password, email, phone, tenant, queue, status + name, password, email, phone, tenant_code, queue, state ) def query_user(self, user_id: int): @@ -259,7 +263,6 @@ def create_or_update_workflow( execution_type: str, timeout: int, worker_group: str, - tenant_code: str, release_state: int, task_relation_json: str, task_definition_json: str, @@ -278,7 +281,6 @@ def create_or_update_workflow( warning_group_id, timeout, worker_group, - tenant_code, release_state, task_relation_json, task_definition_json, diff --git a/src/pydolphinscheduler/models/__init__.py b/src/pydolphinscheduler/models/__init__.py index b289954..404d11a 100644 --- a/src/pydolphinscheduler/models/__init__.py +++ b/src/pydolphinscheduler/models/__init__.py @@ -34,3 +34,4 @@ "Queue", "WorkerGroup", ] + diff --git a/src/pydolphinscheduler/models/base_side.py b/src/pydolphinscheduler/models/base_side.py index 99b4007..3843b33 100644 --- a/src/pydolphinscheduler/models/base_side.py +++ b/src/pydolphinscheduler/models/base_side.py @@ -35,14 +35,8 @@ def create_if_not_exists( # TODO comment for avoiding cycle import # user: Optional[User] = ProcessDefinitionDefault.USER user=configuration.WORKFLOW_USER, + *args, + **kwargs, ): """Create Base if not exists.""" raise NotImplementedError - - def delete_all(self): - """Delete all method.""" - if not self: - return - list_pro = [key for key in self.__dict__.keys()] - for key in list_pro: - self.__delattr__(key) diff --git a/src/pydolphinscheduler/models/meta.py b/src/pydolphinscheduler/models/meta.py new file mode 100644 index 0000000..62417cf --- /dev/null +++ b/src/pydolphinscheduler/models/meta.py @@ -0,0 +1,47 @@ +from functools import wraps +from inspect import signature +from typing import Tuple, Dict + +from py4j.java_gateway import JavaObject +from pydolphinscheduler.utils.string import snake2camel + + +class ModelMeta(type): + _FUNC_INIT = "__init__" + _PARAM_SELF = "self" + + def __new__(mcs, name: str, bases: Tuple, attrs: Dict): + + if mcs._FUNC_INIT not in attrs: + raise TypeError("Class with mateclass %s must have %s method", (mcs.__name__, mcs._FUNC_INIT)) + + sig = signature(attrs.get(mcs._FUNC_INIT)) + param = [param.name for name, param in sig.parameters.items() if name != mcs._PARAM_SELF] + + for attr_name, attr_value in attrs.items(): + if isinstance(attr_value, classmethod) and not attr_name.startswith("__"): + attrs[attr_name] = mcs.j2p(attr_value, name, attrs, param) + return super(ModelMeta, mcs).__new__(mcs, name, bases, attrs) + + @classmethod + def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None): + @wraps(cm) + def wrapper(*args, **kwargs): + class_ = type(name, (), attrs) + + java_obj = cm.__func__(class_, *args, **kwargs) + assert isinstance(java_obj, JavaObject), "The function %s must return JavaObject" % cm.__func__.__name__ + + obj_init_params = [] + for param in params: + java_func_name = mcs.py4j_attr_func_name(param) + java_func = getattr(java_obj, java_func_name) + obj_init_params.append(java_func()) + + return class_(*obj_init_params) + + return wrapper + + @classmethod + def py4j_attr_func_name(mcs, name: str) -> str: + return snake2camel(f"get_{name}") diff --git a/src/pydolphinscheduler/models/queue.py b/src/pydolphinscheduler/models/queue.py index e6da259..132d820 100644 --- a/src/pydolphinscheduler/models/queue.py +++ b/src/pydolphinscheduler/models/queue.py @@ -20,15 +20,81 @@ from typing import Optional from pydolphinscheduler import configuration -from pydolphinscheduler.models import BaseSide +from pydolphinscheduler.java_gateway import gateway +from pydolphinscheduler.models.meta import ModelMeta -class Queue(BaseSide): - """DolphinScheduler Queue object.""" +class Queue(metaclass=ModelMeta): + """Model Queue, communicate with DolphinScheduler API server and convert Java Object into Python. + + We use metaclass :class:`pydolphinscheduler.models.meta.ModelMeta` to convert Java Object into Python. + And code in this class just call Java API method. + + :param id_: queue id, the primary key of queue table + :param queue_name: queue name, unique key, used to identify queue like get, update, delete + :param queue: Yarn queue name, deprecate after dolphinscheduler 2.0.0 + """ def __init__( self, - name: str = configuration.WORKFLOW_QUEUE, - description: Optional[str] = "", + id_: int, + queue_name: str, + queue: str, ): - super().__init__(name, description) + self.id = id_ + self.queue_name = queue_name + self.queue = queue + + @classmethod + def create(cls, queue_name: str, queue: Optional[str] = None) -> "Queue": + """Create queue. + + :param queue_name: queue name, unique key, used to identify queue like get, update, delete + :param queue: Yarn queue name, deprecate after dolphinscheduler 2.0.0 + """ + try: + cls.get(queue_name) + raise ValueError(f"Tenant {queue_name} already exists") + except ValueError: + if queue is None: + queue = queue_name + return gateway.create_queue(queue_name, queue) + + @classmethod + def get(cls, queue_name: str) -> "Queue": + """Get single queue. + + :param queue_name: queue name, unique key, used to identify queue like get, update, delete + """ + queue = gateway.get_queue(queue_name) + if queue is None: + raise ValueError(f"Tenant {queue_name} not found.") + return queue + + @classmethod + def update(cls, tenant_code: str, queue_name: Optional[str], description: Optional[str]) -> "Queue": + """Update Tenant. + + :param tenant_code: tenant code, unique key. + :param queue_name: new queue name you want to update + :param description: new description you want to update + """ + + original_tenant = cls.get(tenant_code) + if queue_name: + original_tenant.queue_name = queue_name + if description: + original_tenant.description = description + tenant = gateway.update_tenant(original_tenant.id, original_tenant.tenant_code, original_tenant.description) + return tenant + + @classmethod + def delete(cls, tenant_code) -> "Queue": + """Delete Tenant. + + :param tenant_code: tenant code you want to delete + """ + tenant = cls.get(tenant_code) + gateway.delete_tenant(tenant_code) + return tenant + diff --git a/src/pydolphinscheduler/models/tenant.py b/src/pydolphinscheduler/models/tenant.py index 146aec0..2d5b2e7 100644 --- a/src/pydolphinscheduler/models/tenant.py +++ b/src/pydolphinscheduler/models/tenant.py @@ -15,66 +15,92 @@ # specific language governing permissions and limitations # under the License. -"""DolphinScheduler Tenant object.""" +"""DolphinScheduler Tenant model.""" from typing import Optional -from pydolphinscheduler import configuration from pydolphinscheduler.java_gateway import gateway -from pydolphinscheduler.models import BaseSide +from pydolphinscheduler.models.meta import ModelMeta -class Tenant(BaseSide): - """DolphinScheduler Tenant object.""" +class Tenant(metaclass=ModelMeta): + """Model Tenant, communicate with DolphinScheduler API server and convert Java Object into Python. + + We use metaclass :class:`pydolphinscheduler.models.meta.ModelMeta` to convert Java Object into Python. + And code in this class just call Java API method. + + :param id_: tenant id, the primary key of tenant table + :param tenant_code: tenant code, unique key, used to identify tenant like get, update, delete + :param queue_id: the queue id used by this tenant + :param queue_name: the queue name used by this tenant + :param queue: queue + :param description: description of current tenant + """ def __init__( self, - name: str = configuration.WORKFLOW_TENANT, - queue: str = configuration.WORKFLOW_QUEUE, + id_: int, + tenant_code: str, + queue_id: str, + queue_name: str, + queue: str, description: Optional[str] = None, - tenant_id: Optional[int] = None, - code: Optional[str] = None, - user_name: Optional[str] = None, ): - super().__init__(name, description) - self.tenant_id = tenant_id + self.id = id_ + self.tenant_code = tenant_code + self.queue_id = queue_id + self.queue_name = queue_name self.queue = queue - self.code = code - self.user_name = user_name - - def create_if_not_exists( - self, queue_name: str, user=configuration.USER_NAME - ) -> None: - """Create Tenant if not exists.""" - tenant = gateway.create_tenant(self.name, queue_name, self.description) - self.tenant_id = tenant.getId() - self.code = tenant.getTenantCode() - # gateway_result_checker(result, None) + self.description = description @classmethod - def get_tenant(cls, code: str) -> "Tenant": - """Get Tenant list.""" - tenant = gateway.query_tenant(code) + def create(cls, tenant_code: str, queue_name: str, description: str) -> "Tenant": + """Create tenant. + + :param tenant_code: tenant code + :param queue_name: queue name + :param description: description + """ + try: + cls.get(tenant_code) + raise ValueError(f"Tenant {tenant_code} already exists") + except ValueError: + return gateway.create_tenant(tenant_code, queue_name, description) + + @classmethod + def get(cls, tenant_code: str) -> "Tenant": + """Get single Tenant. + + :param tenant_code: tenant code + """ + tenant = gateway.query_tenant(tenant_code) if tenant is None: - return cls() - return cls( - description=tenant.getDescription(), - tenant_id=tenant.getId(), - code=tenant.getTenantCode(), - queue=tenant.getQueueId(), - ) - - def update( - self, user=configuration.USER_NAME, code=None, queue_id=None, description=None - ) -> None: - """Update Tenant.""" - gateway.update_tenant(user, self.tenant_id, code, queue_id, description) - # TODO: check queue_id and queue_name - self.queue = str(queue_id) - self.code = code - self.description = description + raise ValueError(f"Tenant {tenant_code} not found.") + return tenant + + @classmethod + def update(cls, tenant_code: str, queue_name: Optional[str], description: Optional[str]) -> "Tenant": + """Update Tenant. + + :param tenant_code: tenant code, unique key. + :param queue_name: new queue name you want to update + :param description: new description you want to update + """ + + original_tenant = cls.get(tenant_code) + if queue_name: + original_tenant.queue_name = queue_name + if description: + original_tenant.description = description + tenant = gateway.update_tenant(original_tenant.id, original_tenant.tenant_code, original_tenant.description) + return tenant + + @classmethod + def delete(cls, tenant_code) -> "Tenant": + """Delete Tenant. - def delete(self) -> None: - """Delete Tenant.""" - gateway.delete_tenant(self.user_name, self.tenant_id) - self.delete_all() + :param tenant_code: tenant code you want to delete + """ + tenant = cls.get(tenant_code) + gateway.delete_tenant(tenant_code) + return tenant diff --git a/src/pydolphinscheduler/models/user.py b/src/pydolphinscheduler/models/user.py index e45f98d..0f04923 100644 --- a/src/pydolphinscheduler/models/user.py +++ b/src/pydolphinscheduler/models/user.py @@ -15,116 +15,175 @@ # specific language governing permissions and limitations # under the License. -"""DolphinScheduler User object.""" +"""DolphinScheduler User models.""" from typing import Optional +from py4j.java_gateway import JavaObject + from pydolphinscheduler import configuration from pydolphinscheduler.java_gateway import gateway -from pydolphinscheduler.models import BaseSide, Tenant +from pydolphinscheduler.models import BaseSide +from pydolphinscheduler.models.meta import ModelMeta + +class User(metaclass=ModelMeta): + """Model User, communicate with DolphinScheduler API server and convert Java Object into Python. -class User(BaseSide): - """DolphinScheduler User object.""" + We use metaclass :class:`pydolphinscheduler.models.meta.ModelMeta` to convert Java Object into Python. + And code in this class just call Java API method. - _KEY_ATTR = { - "name", - "password", - "email", - "phone", - "tenant", - "queue", - "status", - } + :param id_: user id, the primary key of tenant table + :param user_name: username, unique key, used to identify user like get, update, delete + :param user_password: user password for login + :param user_type: user type, 0: admin user, 1: general user + :param email: user email + :param phone: user phone + :param tenant_id: the id of tenant who the user belongs to + :param queue: the queue of user to use + :param state: user state, 0: inactive, 1: active + :param time_zone: user time zone, default is UTC, and can be configured with string like `Europe/Amsterdam`. + """ def __init__( self, - name: str, - password: Optional[str] = configuration.USER_PASSWORD, - email: Optional[str] = configuration.USER_EMAIL, - phone: Optional[str] = configuration.USER_PHONE, - tenant: Optional[str] = configuration.WORKFLOW_TENANT, - queue: Optional[str] = configuration.WORKFLOW_QUEUE, - status: Optional[int] = configuration.USER_STATE, + id_: int, + user_name: str, + user_password: str, + user_type: int, + email: str, + phone: str, + tenant_id: int, + queue: str, + state: int, + time_zone: Optional[str] = None, ): - super().__init__(name) - self.user_id: Optional[int] = None - self.password = password + self.id = id_ + self.user_name = user_name + self.user_password = user_password + self._user_type = user_type self.email = email self.phone = phone - self.tenant = tenant + self.tenant_id = tenant_id self.queue = queue - self.status = status - - def create_tenant_if_not_exists(self) -> None: - """Create tenant object.""" - tenant = Tenant(name=self.tenant, queue=self.queue) - tenant.create_if_not_exists(self.queue) - - def create_if_not_exists(self, **kwargs): - """Create User if not exists.""" - # Should make sure queue already exists. - self.create_tenant_if_not_exists() - user = gateway.create_user( - self.name, - self.password, - self.email, - self.phone, - self.tenant, - self.queue, - self.status, + self._state = state + self.time_zone = time_zone + + @property + def user_type(self) -> int: + """Return user_type with simple Python type.""" + assert isinstance(self._user_type, JavaObject), "user_type must be JavaObject" + return self._user_type.getCode() + + @user_type.setter + def user_type(self, user_type: JavaObject) -> None: + """Set user_type.""" + assert isinstance(user_type, JavaObject), "user_type must be JavaObject" + self._user_type = user_type + + @property + def is_admin(self) -> bool: + """Return True if user is admin.""" + return self._user_type == 0 + + @property + def is_active(self) -> bool: + """Return True if user is active.""" + return self._state == 1 + + @property + def state(self) -> bool: + """Return user state, we want to make it more readable.""" + return self.is_active + + @state.setter + def state(self, state: int) -> None: + """Set user state.""" + self._state = state + + @staticmethod + def state_p2j(state: bool) -> int: + """Convert Python state to Java state.""" + return 1 if state else 0 + + @classmethod + def create(cls, user_name: str, user_password: str, email: str, phone: str, tenant_code: str, queue: str, state: bool) -> "User": + """Create User. + + Will always create user in admin type in pydolphinscheduler, for operate as much resource as possible. + + :param user_name: username, unique key, used to identify user like get, update, delete + :param user_password: user password for login + :param email: user email + :param phone: user phone + :param tenant_code: the code of tenant who the user belongs to + :param queue: the queue of user to use + :param state: user state, False: inactive, True: active + """ + state_j = cls.state_p2j(state) + return gateway.create_user( + name=user_name, + password=user_password, + email=email, + phone=phone, + tenant_code=tenant_code, + queue=queue, + state=state_j, ) - self.user_id = user.getId() - # TODO recover result checker - # gateway_result_checker(result, None) @classmethod - def get_user(cls, user_id) -> "User": - """Get User.""" - user = gateway.query_user(user_id) + def get(cls, user_name: str) -> "User": + """Get User. + + :param user_name: username, unique key, used to identify user like get, update, delete + """ + user = gateway.query_user(user_name) if user is None: - return cls("") - user_id = user.getId() - user = cls( - name=user.getUserName(), - password=user.getUserPassword(), - email=user.getEmail(), - phone=user.getPhone(), - tenant=user.getTenantCode(), - queue=user.getQueueName(), - status=user.getState(), - ) - user.user_id = user_id + raise ValueError(f"User {user_name} not found.") return user - def update( - self, - password=None, - email=None, - phone=None, - tenant=None, - queue=None, - status=None, - ) -> None: - """Update User.""" - user = gateway.update_user( - self.name, - password, - email, - phone, - tenant, - queue, - status, + @classmethod + def update(cls, user_name: str, user_password: Optional[str] = None, email: Optional[str] = None, phone: Optional[str] = None, tenant_code: Optional[int] = None, queue: Optional[str] = None, state: Optional[bool] = None) -> "User": + """Update User. + + :param user_name: username, unique key, used to identify user like get, update, delete + :param user_password: user password for login + :param email: user email + :param phone: user phone + :param tenant_code: the id of tenant who the user belongs to + :param queue: the queue of user to use + :param state: user state, False: inactive, True: active + """ + original_user = cls.get(user_name) + if user_password: + original_user.user_password = user_password + if email: + original_user.email = email + if phone: + original_user.phone = phone + if tenant_code: + original_user.tenant_id = tenant_code + if queue: + original_user.queue = queue + if state: + state_j = cls.state_p2j(state) + original_user.state = state_j + return gateway.update_user( + user_name=user_name, + user_password=original_user.user_password, + email=original_user.email, + phone=original_user.phone, + tenant_id=original_user.tenant_id, + queue=original_user.queue, + state=original_user.state, ) - self.user_id = user.getId() - self.name = user.getUserName() - self.password = user.getUserPassword() - self.email = user.getEmail() - self.phone = user.getPhone() - self.queue = user.getQueueName() - self.status = user.getState() - - def delete(self) -> None: - """Delete User.""" - gateway.delete_user(self.name, self.user_id) - self.delete_all() + + @classmethod + def delete(cls, user_name) -> "User": + """Delete User. + + :param user_name: username, unique key, used to identify user like get, update, delete + """ + user = cls.get(user_name) + gateway.delete_user(user_name) + return user diff --git a/src/pydolphinscheduler/utils/encode.py b/src/pydolphinscheduler/utils/encode.py new file mode 100644 index 0000000..d893a39 --- /dev/null +++ b/src/pydolphinscheduler/utils/encode.py @@ -0,0 +1,6 @@ +import hashlib + + +def md5(string: str) -> str: + """Encode string with MD5 hashlib.""" + return hashlib.md5(string.encode('utf-8')).hexdigest() diff --git a/tests/core/test_workflow.py b/tests/core/test_workflow.py index e5c8601..c2b6d4e 100644 --- a/tests/core/test_workflow.py +++ b/tests/core/test_workflow.py @@ -28,7 +28,7 @@ from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.exceptions import PyDSParamException -from pydolphinscheduler.models import Project, Tenant, User +from pydolphinscheduler.models import Project, User from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition from pydolphinscheduler.utils.date import conv_to_schedule from tests.testing.task import Task @@ -49,7 +49,6 @@ def test_workflow_key_attr(func): [ ("timezone", configuration.WORKFLOW_TIME_ZONE), ("project", Project(configuration.WORKFLOW_PROJECT)), - ("tenant", Tenant(configuration.WORKFLOW_TENANT)), ( "user", User( @@ -57,7 +56,6 @@ def test_workflow_key_attr(func): configuration.USER_PASSWORD, configuration.USER_EMAIL, configuration.USER_PHONE, - configuration.WORKFLOW_TENANT, configuration.WORKFLOW_QUEUE, configuration.USER_STATE, ), @@ -146,7 +144,6 @@ def test_set_release_state_error(value): "set_attr,set_val,get_attr,get_val", [ ("_project", "project", "project", Project("project")), - ("_tenant", "tenant", "tenant", Tenant("tenant")), ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)), ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)), ], @@ -333,7 +330,6 @@ def test_workflow_get_define_without_task(): "name": TEST_WORKFLOW_NAME, "description": None, "project": configuration.WORKFLOW_PROJECT, - "tenant": configuration.WORKFLOW_TENANT, "workerGroup": configuration.WORKFLOW_WORKER_GROUP, "warningType": configuration.WORKFLOW_WARNING_TYPE, "warningGroupId": 0, @@ -453,32 +449,7 @@ def test_workflow_simple_separate(): pre_task = pd.get_one_task_by_name(f"task-{i - 1}") curr_task.set_upstream(pre_task) assert len(pd.tasks) == expect_tasks_num - assert all(["task-" in task.name for task in pd.task_list]) - - -@pytest.mark.parametrize( - "user_attrs", - [ - {"tenant": "tenant_specific"}, - ], -) -def test_set_workflow_user_attr(user_attrs): - """Test user with correct attributes if we specific assigned to workflow object.""" - default_value = { - "tenant": configuration.WORKFLOW_TENANT, - } - with Workflow(TEST_WORKFLOW_NAME, **user_attrs) as pd: - user = pd.user - for attr in default_value: - # Get assigned attribute if we specific, else get default value - except_attr = ( - user_attrs[attr] if attr in user_attrs else default_value[attr] - ) - # Get actually attribute of user object - actual_attr = getattr(user, attr) - assert ( - except_attr == actual_attr - ), f"Except attribute is {except_attr} but get {actual_attr}" + assert all(["task-" in task.name_chd for task in pd.task_list]) def test_schedule_json_none_schedule(): diff --git a/tests/example/test_example.py b/tests/example/test_example.py index dbe9c5f..30c277f 100644 --- a/tests/example/test_example.py +++ b/tests/example/test_example.py @@ -70,17 +70,17 @@ def setup_and_teardown_for_stuff(): def submit_check_without_same_name(self): """Side effect for verifying workflow name and adding it to global variable.""" - if self.name in workflow_name: + if self.name_chd in workflow_name: raise ValueError( "Example workflow should not have same name, but get duplicate name: %s", - self.name, + self.name_chd, ) submit_add_workflow(self) def submit_add_workflow(self): """Side effect for adding workflow name to global variable.""" - workflow_name.add(self.name) + workflow_name.add(self.name_chd) def test_example_basic(): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c15b897..bd7b7d2 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -24,7 +24,7 @@ from tests.testing.docker_wrapper import DockerWrapper -@pytest.fixture(scope="package", autouse=True) +# @pytest.fixture(scope="package", autouse=True) def docker_setup_teardown(): """Fixture for whole package tests, Set up and teardown docker env. diff --git a/tests/integration/test_tenant.py b/tests/integration/test_tenant.py index c1ec33c..5798e4b 100644 --- a/tests/integration/test_tenant.py +++ b/tests/integration/test_tenant.py @@ -15,72 +15,61 @@ # specific language governing permissions and limitations # under the License. -"""Test pydolphinscheduler tenant.""" +"""Test pydolphinscheduler models tenant.""" + +from typing import Dict + import pytest from pydolphinscheduler.models import Tenant, User +tenant_code = "tenant-code" +queue_name = "tenant-name" +description = "description-of-this-tenant" + + +def test_create(): + """Test create tenant.""" + tenant = Tenant.create(tenant_code, queue_name, description) + assert tenant is not None + assert tenant.id is not None and tenant.tenant_code == tenant_code and tenant.queue_name == queue_name and tenant.description == description + + +def test_get(): + """Test get tenant.""" + tenant = Tenant.get(tenant_code) + assert tenant is not None + assert tenant.id is not None and tenant.tenant_code == tenant_code and tenant.queue_name == queue_name and tenant.description == description + + +def test_get_error(): + """Test get tenant error.""" + with pytest.raises(ValueError, match="Tenant.*not found."): + Tenant.get(tenant_code="not-exists-tenant") + + +@pytest.mark.parametrize( + "update_params, expected", + [ + ({"queue_name": "new_queue_name"}, {"queue_name": "new_queue_name"}), + ({"description": "new_description"}, {"description": "new_description"}), + ({"queue_name": "new_queue_name", "description": "new_description"}, {"queue_name": "new_queue_name", "description": "new_description"}), + ] +) +def test_update(update_params: Dict, expected: Dict): + """Test update tenant.""" + # previous user attributes not equal to expected + original_tenant = Tenant.get(tenant_code=tenant_code) + assert all([getattr(original_tenant, exp) != expected.get(exp) for exp in expected]) + + # post user attributes equal to expected + update_tenant = Tenant.update(tenant_code, **update_params) + assert update_tenant is not None and update_tenant.id is not None and update_tenant.id == original_tenant.id and update_tenant.tenant_code == original_tenant.tenant_code + assert update_tenant.id is not None and update_tenant.tenant_code == tenant_code and update_tenant.queue_name == queue_name and update_tenant.description == description + -def get_user( - name="test-name", - password="test-password", - email="test-email@abc.com", - phone="17366637777", - tenant="test-tenant", - queue="test-queue", - status=1, -): - """Get a test user.""" - user = User(name, password, email, phone, tenant, queue, status) - user.create_if_not_exists() - return user - - -def get_tenant( - name="test-name-1", - queue="test-queue-1", - description="test-description", - tenant_code="test-tenant-code", - user_name=None, -): - """Get a test tenant.""" - tenant = Tenant(name, queue, description, code=tenant_code, user_name=user_name) - tenant.create_if_not_exists(name) - return tenant - - -def test_create_tenant(): - """Test create tenant from java gateway.""" - tenant = get_tenant() - assert tenant.tenant_id is not None - - -def test_get_tenant(): - """Test get tenant from java gateway.""" - tenant = get_tenant() - tenant_ = Tenant.get_tenant(tenant.code) - assert tenant_.tenant_id == tenant.tenant_id - - -def test_update_tenant(): - """Test update tenant from java gateway.""" - tenant = get_tenant(user_name="admin") - tenant.update( - user="admin", - code="test-code-updated", - queue_id=1, - description="test-description-updated", - ) - tenant_ = Tenant.get_tenant(code=tenant.code) - assert tenant_.code == "test-code-updated" - assert tenant_.queue == 1 - - -def test_delete_tenant(): - """Test delete tenant from java gateway.""" - tenant = get_tenant(user_name="admin") - tenant.delete() - with pytest.raises(AttributeError) as excinfo: - _ = tenant.tenant_id - - assert excinfo.type == AttributeError +def test_delete(): + """Test delete tenant.""" + tenant = Tenant.update(tenant_code) + assert tenant is not None + assert tenant.id is not None and tenant.tenant_code == tenant_code and tenant.queue_name == queue_name and tenant.description == description diff --git a/tests/integration/test_user.py b/tests/integration/test_user.py index 74248fa..0986032 100644 --- a/tests/integration/test_user.py +++ b/tests/integration/test_user.py @@ -15,93 +15,77 @@ # specific language governing permissions and limitations # under the License. -"""Test pydolphinscheduler user.""" +"""Test pydolphinscheduler model user.""" -import hashlib +from typing import Dict import pytest from pydolphinscheduler.models import User +from pydolphinscheduler.utils import encode -def md5(str): - """MD5 a string.""" - hl = hashlib.md5() - hl.update(str.encode(encoding="utf-8")) - return hl.hexdigest() - - -def get_user( - name="test-name", - password="test-password", - email="test-email@abc.com", - phone="17366637777", - tenant="test-tenant", - queue="test-queue", - status=1, -): - """Get a test user.""" - user = User( - name=name, - password=password, - email=email, - phone=phone, - tenant=tenant, - queue=queue, - status=status, - ) - user.create_if_not_exists() - return user - - -def test_create_user(): - """Test weather client could connect java gate way or not.""" - user = User( - name="test-name", - password="test-password", - email="test-email@abc.com", - phone="17366637777", - tenant="test-tenant", - queue="test-queue", - status=1, - ) - user.create_if_not_exists() - assert user.user_id is not None - - -def test_get_user(): - """Test get user from java gateway.""" - user = get_user() - user_ = User.get_user(user.user_id) - assert user_.password == md5(user.password) - assert user_.email == user.email - assert user_.phone == user.phone - assert user_.status == user.status - - -def test_update_user(): - """Test update user from java gateway.""" - user = get_user() - user.update( - password="test-password-", - email="test-email-updated@abc.com", - phone="17366637766", - tenant="test-tenant-updated", - queue="test-queue-updated", - status=2, - ) - user_ = User.get_user(user.user_id) - assert user_.password == md5("test-password-") - assert user_.email == "test-email-updated@abc.com" - assert user_.phone == "17366637766" - assert user_.status == 2 - - -def test_delete_user(): - """Test delete user from java gateway.""" - user = get_user() - user.delete() - with pytest.raises(AttributeError) as excinfo: - _ = user.user_id - - assert excinfo.type == AttributeError +tenant_code = "tenant-code" +queue_name = "tenant-name" +description = "description-of-this-tenant" + + +user_name = "test-name" +user_password = "test-password" +email = "dolphinscheduler@apache.org" +phone = "12345678912" +tenant_code = "tenant-code" +queue = "test-queue" +state = True + + +def test_create(): + """Test create user.""" + user = User.create(user_name=user_name, user_password=user_password, email=email, phone=phone, tenant_code=tenant_code, queue=queue, state=state) + assert user is not None + assert user.id is not None and user.tenant_id is not None and user.user_name == user_name and user.user_password == encode.md5(user_password) and user.email == email and user.phone == phone and user.queue == queue and user.state == state + + +def test_get(): + """Test get a single user.""" + user = User.get(user_name=user_name) + assert user is not None + assert user.id is not None and user.tenant_id is not None and user.user_name == user_name and user.user_password == encode.md5(user_password) and user.email == email and user.phone == phone and user.queue == queue and user.state == state + + +def test_get_error(): + """Test get user error.""" + with pytest.raises(ValueError, match="User.*not found."): + User.get(user_name="not-exists-user") + + +@pytest.mark.parametrize( + "update_params, expected", + [ + ({"password": "new-password"}, {"user_password": encode.md5("new-password")}), + ({"email": "test-dolphinscheduler@apache.org"}, {"email": "test-dolphinscheduler@apache.org"}), + ({"phone": reversed(phone)}, {"phone": reversed(phone)}), + ({"state": False}, {"state": False}), + ({"email": "test-dolphinscheduler@apache.org", "phone": reversed(phone)}, {"email": "test-dolphinscheduler@apache.org", "phone": reversed(phone)}), + ] +) +def test_update(update_params: Dict, expected: Dict): + """Test update a single user.""" + # previous user attributes not equal to expected + original_user = User.get(user_name=user_name) + assert all([getattr(original_user, exp) != expected.get(exp) for exp in expected]) + + # post user attributes equal to expected + update_user = User.update(user_name=user_name, **update_params) + assert update_user is not None and update_user.id is not None and update_user.id == original_user.id and update_user.user_name == original_user.user_name + assert all([getattr(update_user, exp) == expected.get(exp) for exp in expected]) + + +def test_delete(): + """Test delete a single user.""" + exists_user = User.get(user_name) + assert exists_user is not None + + User.delete(user_name=user_name) + user = User.get(user_name) + assert user is None diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py index 700f418..c50e457 100644 --- a/tests/tasks/test_condition.py +++ b/tests/tasks/test_condition.py @@ -424,7 +424,7 @@ def test_condition_set_dep_workflow(mock_task_code_version): # General tasks test assert len(pd.tasks) == 6 - assert sorted(pd.task_list, key=lambda t: t.name) == sorted( + assert sorted(pd.task_list, key=lambda t: t.name_chd) == sorted( [ pre_task_1, pre_task_2, @@ -433,7 +433,7 @@ def test_condition_set_dep_workflow(mock_task_code_version): fail_branch, condition, ], - key=lambda t: t.name, + key=lambda t: t.name_chd, ) # Task dep test assert success_branch._upstream_task_codes == {condition.code} diff --git a/tests/tasks/test_func_wrap.py b/tests/tasks/test_func_wrap.py index 8c94c8e..0cf14ba 100644 --- a/tests/tasks/test_func_wrap.py +++ b/tests/tasks/test_func_wrap.py @@ -48,7 +48,7 @@ def foo(): assert len(pd.tasks) == 1 pd_task = pd.tasks[12345] - assert pd_task.name == "foo" + assert pd_task.name_chd == "foo" assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()" @@ -69,7 +69,7 @@ def foo(): assert len(pd.tasks) == 1 pd_task = pd.tasks[12345] - assert pd_task.name == "foo" + assert pd_task.name_chd == "foo" assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()" diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py index 37c3b44..71ccea9 100644 --- a/tests/tasks/test_switch.py +++ b/tests/tasks/test_switch.py @@ -279,8 +279,8 @@ def test_switch_set_dep_workflow(mock_task_code_version): parent >> switch # General tasks test assert len(pd.tasks) == 4 - assert sorted(pd.task_list, key=lambda t: t.name) == sorted( - [parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name + assert sorted(pd.task_list, key=lambda t: t.name_chd) == sorted( + [parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name_chd ) # Task dep test assert parent._downstream_task_codes == {switch.code} diff --git a/tests/utils/test_string.py b/tests/utils/test_string.py index 2ccd206..dbf056a 100644 --- a/tests/utils/test_string.py +++ b/tests/utils/test_string.py @@ -33,6 +33,7 @@ ("_snake_case", "SnakeCase"), ("__snake_case", "SnakeCase"), ("Snake_case", "SnakeCase"), + ("snake_case_", "snakeCase"), ], ) def test_snake2camel(snake: str, expect: str):