Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[impv] Define workflow without create user, queue, tenant #40

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/pydolphinscheduler/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
set_single_config,
)
from pydolphinscheduler.core.yaml_workflow import create_workflow
from pydolphinscheduler.cli.tenants import tenants

version_option_val = ["major", "minor", "micro"]

Expand Down Expand Up @@ -104,3 +105,6 @@ def config(getter, setter, init) -> None:
def yaml(yaml_file) -> None:
"""Create workflow using YAML file."""
create_workflow(yaml_file)


cli.add_command(tenants)
64 changes: 64 additions & 0 deletions src/pydolphinscheduler/cli/tenants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import click

from pydolphinscheduler import configuration
from pydolphinscheduler.models.tenant import Tenant


@click.help_option()
def tenants() -> click.group:
"""Users subcommand group."""


@click.group()
def tenants() -> click.group:
"""Users subcommand group."""


@tenants.command()
@click.option(
"-n", "--name", "name",
required=True,
type=str,
)
@click.option(
"-q", "--queue-name", "queue_name",
required=True,
type=str,
)
@click.option(
"-d", "--description", "description",
required=True,
type=str,
)
def create(name, queue_name, description):
tenant = Tenant.get(name)
if tenant:
click.echo(f"Tenant with name {name} already exists.", err=True)
new_tenant = Tenant.create(name, queue_name, description)
click.echo(f"Tenant {new_tenant.name} had been created.")


@tenants.command()
@click.option(
"-n", "--name", "name",
required=True,
type=str,
)
def delete(name):
tenant = Tenant.delete(name)
if not tenant:
click.echo(f"Tenant with name {name} not exists.", err=True)
click.echo(f"Tenant: {tenant}.")


@tenants.command()
@click.option(
"-n", "--name", "name",
required=True,
type=str,
)
def get(name):
tenant = Tenant.get(name)
if not tenant:
click.echo(f"Tenant with name {name} not exists.", err=True)
click.echo(f"Tenant: {tenant}.")
Empty file.
22 changes: 5 additions & 17 deletions src/pydolphinscheduler/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import gateway
from pydolphinscheduler.models import Base, Project, Tenant, User
from pydolphinscheduler.models import Base, Project, User
from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule


Expand Down Expand Up @@ -96,7 +96,6 @@ class Workflow(Base):
"name",
"description",
"_project",
"_tenant",
"worker_group",
"warning_type",
"warning_group_id",
Expand All @@ -120,7 +119,6 @@ def __init__(
timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
user: Optional[str] = configuration.WORKFLOW_USER,
project: Optional[str] = configuration.WORKFLOW_PROJECT,
tenant: Optional[str] = configuration.WORKFLOW_TENANT,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
warning_group_id: Optional[int] = 0,
Expand All @@ -130,6 +128,8 @@ def __init__(
param: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
resource_list: Optional[List[Resource]] = None,
*args,
**kwargs,
):
super().__init__(name, description)
self.schedule = schedule
Expand All @@ -138,7 +138,6 @@ def __init__(
self.timezone = timezone
self._user = user
self._project = project
self._tenant = tenant
self.worker_group = worker_group
self.warning_type = warning_type
if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"):
Expand Down Expand Up @@ -176,16 +175,6 @@ def __enter__(self) -> "Workflow":
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
WorkflowContext.delete()

@property
def tenant(self) -> Tenant:
"""Get attribute tenant."""
return Tenant(self._tenant)

@tenant.setter
def tenant(self, tenant: Tenant) -> None:
"""Set attribute tenant."""
self._tenant = tenant.name

@property
def project(self) -> Project:
"""Get attribute project."""
Expand All @@ -202,7 +191,7 @@ def user(self) -> User:

For now we just get from python models but not from java gateway models, so it may not correct.
"""
return User(name=self._user, tenant=self._tenant)
return User(name=self._user)

@staticmethod
def _parse_datetime(val: Any) -> Any:
Expand Down Expand Up @@ -359,7 +348,7 @@ def get_tasks_by_name(self, name: str) -> Set["Task"]: # noqa: F821
"""Get tasks object by given name, if will return all tasks with this name."""
find = set()
for task in self.tasks.values():
if task.name == name:
if task.name_chd == name:
find.add(task)
return find

Expand Down Expand Up @@ -436,7 +425,6 @@ def submit(self) -> int:
self.execution_type,
self.timeout,
self.worker_group,
self._tenant,
self.release_state,
# TODO add serialization function
json.dumps(self.task_relation_json),
Expand Down
23 changes: 23 additions & 0 deletions src/pydolphinscheduler/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from functools import wraps
from typing import Callable

from pydolphinscheduler import configuration


def provide_user(func: Callable) -> Callable:
"""Decorator provides user from configuration when user is not provided.

If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
session_args_idx = find_session_idx(func)

@wraps(func)
def wrapper(*args, **kwargs) -> Callable:
if "user" in kwargs or session_args_idx < len(args):
return func(*args, **kwargs)
else:
return func(*args, user=configuration.USER_NAME, **kwargs)

return wrapper
26 changes: 14 additions & 12 deletions src/pydolphinscheduler/examples/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,24 @@
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:

wf = Workflow()
# [end workflow_declare]
# [start task_declare]
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
# [end task_declare]
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler", workflow=wf)
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
# [end task_declare]

# [start task_relation_declare]
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
# [start task_relation_declare]
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)

task_union << task_group
# [end task_relation_declare]
task_union << task_group
# [end task_relation_declare]

# [start submit_or_run]
pd.run()
# [start submit_or_run]
pd.submit()
# [end submit_or_run]
# [end tutorial]
40 changes: 21 additions & 19 deletions src/pydolphinscheduler/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Any, Optional

from py4j.java_collections import JavaMap
from py4j.java_gateway import GatewayParameters, JavaGateway
from py4j.java_gateway import GatewayParameters, JavaGateway, JavaObject
from py4j.protocol import Py4JError

from pydolphinscheduler import __version__, configuration
Expand Down Expand Up @@ -160,11 +160,11 @@ def delete_project(self, user: str, code: int):
return self.gateway.entry_point.deleteProject(user, code)

def create_tenant(
self, tenant_name: str, queue_name: str, description: Optional[str] = None
):
self, tenant_code: str, queue_name: str, description: Optional[str] = None
) -> JavaObject:
"""Create tenant through java gateway."""
return self.gateway.entry_point.createTenant(
tenant_name, description, queue_name
tenant_code, description, queue_name
)

def query_tenant(self, tenant_code: str):
Expand All @@ -177,19 +177,23 @@ def grant_tenant_to_user(self, user_name: str, tenant_code: str):

def update_tenant(
self,
user: str,
tenant_id: int,
code: str,
queue_id: int,
description: Optional[str] = None,
):
"""Update tenant through java gateway."""
id_: int,
tenant_code: str,
queue_id: str,
description: str,
user: Optional[str] = configuration.USER_NAME,
) -> JavaObject:
"""Update tenant by tenant_code through java gateway."""
return self.gateway.entry_point.updateTenant(
user, tenant_id, code, queue_id, description
user, id_, tenant_code, queue_id, description
)

def delete_tenant(self, user: str, tenant_id: int):
"""Delete tenant through java gateway."""
def delete_tenant(self, tenant_code: str, user: Optional[str] = configuration.USER_NAME) -> None:
"""Delete tenant by tenant_code through java gateway.

:tenant_code: tenant code for identification resource
:user: username to delete tenant
"""
return self.gateway.entry_point.deleteTenantById(user, tenant_id)

def create_user(
Expand All @@ -198,13 +202,13 @@ def create_user(
password: str,
email: str,
phone: str,
tenant: str,
tenant_code: str,
queue: str,
status: int,
state: int,
):
"""Create user through java gateway."""
return self.gateway.entry_point.createUser(
name, password, email, phone, tenant, queue, status
name, password, email, phone, tenant_code, queue, state
)

def query_user(self, user_id: int):
Expand Down Expand Up @@ -259,7 +263,6 @@ def create_or_update_workflow(
execution_type: str,
timeout: int,
worker_group: str,
tenant_code: str,
release_state: int,
task_relation_json: str,
task_definition_json: str,
Expand All @@ -278,7 +281,6 @@ def create_or_update_workflow(
warning_group_id,
timeout,
worker_group,
tenant_code,
release_state,
task_relation_json,
task_definition_json,
Expand Down
1 change: 1 addition & 0 deletions src/pydolphinscheduler/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@
"Queue",
"WorkerGroup",
]

10 changes: 2 additions & 8 deletions src/pydolphinscheduler/models/base_side.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,8 @@ def create_if_not_exists(
# TODO comment for avoiding cycle import
# user: Optional[User] = ProcessDefinitionDefault.USER
user=configuration.WORKFLOW_USER,
*args,
**kwargs,
):
"""Create Base if not exists."""
raise NotImplementedError

def delete_all(self):
"""Delete all method."""
if not self:
return
list_pro = [key for key in self.__dict__.keys()]
for key in list_pro:
self.__delattr__(key)
47 changes: 47 additions & 0 deletions src/pydolphinscheduler/models/meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from functools import wraps
from inspect import signature
from typing import Tuple, Dict

from py4j.java_gateway import JavaObject
from pydolphinscheduler.utils.string import snake2camel


class ModelMeta(type):
_FUNC_INIT = "__init__"
_PARAM_SELF = "self"

def __new__(mcs, name: str, bases: Tuple, attrs: Dict):

if mcs._FUNC_INIT not in attrs:
raise TypeError("Class with mateclass %s must have %s method", (mcs.__name__, mcs._FUNC_INIT))

sig = signature(attrs.get(mcs._FUNC_INIT))
param = [param.name for name, param in sig.parameters.items() if name != mcs._PARAM_SELF]

for attr_name, attr_value in attrs.items():
if isinstance(attr_value, classmethod) and not attr_name.startswith("__"):
attrs[attr_name] = mcs.j2p(attr_value, name, attrs, param)
return super(ModelMeta, mcs).__new__(mcs, name, bases, attrs)

@classmethod
def j2p(mcs, cm: classmethod, name: str, attrs: Dict, params=None):
@wraps(cm)
def wrapper(*args, **kwargs):
class_ = type(name, (), attrs)

java_obj = cm.__func__(class_, *args, **kwargs)
assert isinstance(java_obj, JavaObject), "The function %s must return JavaObject" % cm.__func__.__name__

obj_init_params = []
for param in params:
java_func_name = mcs.py4j_attr_func_name(param)
java_func = getattr(java_obj, java_func_name)
obj_init_params.append(java_func())

return class_(*obj_init_params)

return wrapper

@classmethod
def py4j_attr_func_name(mcs, name: str) -> str:
return snake2camel(f"get_{name}")
Loading