diff --git a/.coveragerc b/.coveragerc deleted file mode 100644 index c2c57943e3..0000000000 --- a/.coveragerc +++ /dev/null @@ -1,8 +0,0 @@ -[run] -source = - . -omit = - src* - downloads* - sql/migrations/* - venv* diff --git a/.github/workflows/django.yml b/.github/workflows/django.yml index 063a8b76f9..af6657d938 100644 --- a/.github/workflows/django.yml +++ b/.github/workflows/django.yml @@ -72,7 +72,8 @@ jobs: run: | sudo apt-get update && sudo apt-get install libsasl2-dev libkrb5-dev libldap2-dev libssl-dev unixodbc unixodbc-dev python -m pip install --upgrade pip - pip install codecov coverage flake8 -r requirements.txt + pip install -r requirements.txt + pip install -r dev-requirements.txt - name: Init Table run: | @@ -83,8 +84,7 @@ jobs: run: | python manage.py makemigrations python manage.py makemigrations sql - coverage run manage.py test -v 3 --keepdb - coverage xml + pytest --cov --cov-report xml - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/.gitignore b/.gitignore index 20acd0b102..8cfc160d38 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ sonar-project.properties .scannerwork .env local_settings.py -src/docker-compose-dev \ No newline at end of file +src/docker-compose-dev +.coverage \ No newline at end of file diff --git a/archery/settings.py b/archery/settings.py index 8c6e35da0e..dfc3d3768c 100644 --- a/archery/settings.py +++ b/archery/settings.py @@ -66,6 +66,7 @@ "sql.notify:GenericWebhookNotifier", ], ), + CURRENT_AUDITOR=(str, "sql.utils.workflow_audit:AuditV2"), ) # SECURITY WARNING: keep the secret key used in production secret! @@ -103,6 +104,8 @@ ENABLED_ENGINES = env("ENABLED_ENGINES") +CURRENT_AUDITOR = env("CURRENT_AUDITOR") + # Application definition INSTALLED_APPS = ( "django.contrib.admin", diff --git a/common/utils/const.py b/common/utils/const.py index 2a1f336cbd..c1c6f090c7 100644 --- a/common/utils/const.py +++ b/common/utils/const.py @@ -23,6 +23,18 @@ class WorkflowStatus(models.IntegerChoices): ABORTED = 3, "审核取消" +class WorkflowAction(models.IntegerChoices): + """工单操作列表, 必须是动词, 不是一种状态""" + + SUBMIT = 0, "提交" + PASS = 1, "审核通过" + REJECT = 2, "审核不通过" + ABORT = 3, "审核取消" + EXECUTE_SET_TIME = 4, "设置定时执行" + EXECUTE_START = 5, "开始执行" + EXECUTE_END = 6, "执行结束" + + class SQLTuning: SYS_PARM_FILTER = [ "BINLOG_CACHE_SIZE", diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000000..5e13608945 --- /dev/null +++ b/conftest.py @@ -0,0 +1,129 @@ +import datetime + +import pytest + +from common.utils.const import WorkflowStatus +from sql.models import ( + Instance, + ResourceGroup, + SqlWorkflow, + SqlWorkflowContent, + QueryPrivilegesApply, + ArchiveConfig, +) +from common.config import SysConfig + + +@pytest.fixture +def normal_user(django_user_model): + user = django_user_model.objects.create( + username="test_user", display="中文显示", is_active=True + ) + yield user + user.delete() + + +@pytest.fixture +def super_user(django_user_model): + user = django_user_model.objects.create( + username="super_user", display="超级用户", is_active=True, is_superuser=True + ) + yield user + user.delete() + + +@pytest.fixture +def db_instance(db): + ins = Instance.objects.create( + instance_name="some_ins", + type="slave", + db_type="mysql", + host="some_host", + port=3306, + user="ins_user", + password="some_str", + ) + yield ins + ins.delete() + + +@pytest.fixture +def resource_group(db) -> ResourceGroup: + res_group = ResourceGroup.objects.create(group_id=1, group_name="group_name") + yield res_group + res_group.delete() + + +@pytest.fixture +def sql_workflow(db_instance): + wf = SqlWorkflow.objects.create( + workflow_name="some_name", + group_id=1, + group_name="g1", + engineer_display="", + audit_auth_groups="some_audit_group", + create_time=datetime.datetime.now(), + status="workflow_timingtask", + is_backup=True, + instance=db_instance, + db_name="some_db", + syntax_type=1, + ) + wf_content = SqlWorkflowContent.objects.create( + workflow=wf, sql_content="some_sql", execute_result="" + ) + yield wf, wf_content + wf.delete() + wf_content.delete() + + +@pytest.fixture +def sql_query_apply(db_instance): + tomorrow = datetime.datetime.today() + datetime.timedelta(days=1) + query_apply_1 = QueryPrivilegesApply.objects.create( + group_id=1, + group_name="some_name", + title="some_title1", + user_name="some_user", + instance=db_instance, + db_list="some_db,some_db2", + limit_num=100, + valid_date=tomorrow, + priv_type=1, + status=0, + audit_auth_groups="some_audit_group", + ) + yield query_apply_1 + query_apply_1.delete() + + +@pytest.fixture +def archive_apply(db_instance, resource_group): + archive_apply_1 = ArchiveConfig.objects.create( + title="title", + resource_group=resource_group, + audit_auth_groups="", + src_instance=db_instance, + src_db_name="src_db_name", + src_table_name="src_table_name", + dest_instance=db_instance, + dest_db_name="src_db_name", + dest_table_name="src_table_name", + condition="1=1", + mode="file", + no_delete=True, + sleep=1, + status=WorkflowStatus.WAITING, + state=False, + user_name="some_user", + user_display="display", + ) + yield archive_apply_1 + archive_apply_1.delete() + + +@pytest.fixture +def setup_sys_config(db): + sys_config = SysConfig() + yield sys_config + sys_config.purge() diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 0000000000..0b20185751 --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,6 @@ +pytest +pytest-django +pytest-mock +pytest-cov +codecov +flake8 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000..a53e3a2af6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,28 @@ +[tool.pytest.ini_options] +DJANGO_SETTINGS_MODULE = "archery.settings" +python_files = "tests.py test_*.py *_tests.py" + +[tool.coverage.run] +source = [ + "." +] +omit = [ + # omit anything in a .local directory anywhere + "src*", + # omit everything in /usr + "downloads*", + # omit this single file + "sql/migrations/*", + "venv*" +] + +[tool.coverage.report] +omit = [ + # omit anything in a .local directory anywhere + "src*", + # omit everything in /usr + "downloads*", + # omit this single file + "sql/migrations/*", + "venv*" +] \ No newline at end of file diff --git a/sql/archiver.py b/sql/archiver.py index 61bea7dd25..7b9416135c 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -22,7 +22,7 @@ from django.urls import reverse from django_q.tasks import async_task -from common.utils.const import WorkflowStatus, WorkflowType +from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction from common.utils.extend_json_encoder import ExtendJSONEncoder from common.utils.timer import FuncTimer from sql.engines import get_engine @@ -30,7 +30,7 @@ from sql.plugins.pt_archiver import PtArchiver from sql.utils.resource_group import user_instances, user_groups from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup -from sql.utils.workflow_audit import Audit +from sql.utils.workflow_audit import get_auditor, AuditException, Audit logger = logging.getLogger("default") __author__ = "hhyo" @@ -171,54 +171,55 @@ def archive_apply(request): # 获取资源组和审批信息 res_group = ResourceGroup.objects.get(group_name=group_name) - audit_auth_groups = Audit.settings(res_group.group_id, WorkflowType.ARCHIVE) - if not audit_auth_groups: - return JsonResponse({"status": 1, "msg": "审批流程不能为空,请先配置审批流程", "data": {}}) - # 使用事务保持数据一致性 - try: - with transaction.atomic(): - # 保存申请信息到数据库 - archive_info = ArchiveConfig.objects.create( - title=title, - resource_group=res_group, - audit_auth_groups=audit_auth_groups, - src_instance=s_ins, - src_db_name=src_db_name, - src_table_name=src_table_name, - dest_instance=d_ins, - dest_db_name=dest_db_name, - dest_table_name=dest_table_name, - condition=condition, - mode=mode, - no_delete=no_delete, - sleep=sleep, - status=WorkflowStatus.WAITING, - state=False, - user_name=user.username, - user_display=user.display, - ) - archive_id = archive_info.id - # 调用工作流插入审核信息 - audit_result, audit_detail = Audit.add(WorkflowType.ARCHIVE, archive_id) - except Exception as msg: - logger.error(traceback.format_exc()) - result["status"] = 1 - result["msg"] = str(msg) - else: - result = audit_result - # 消息通知 - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=archive_id, workflow_type=WorkflowType.ARCHIVE + with transaction.atomic(): + # 保存申请信息到数据库 + archive_info = ArchiveConfig( + title=title, + resource_group=res_group, + audit_auth_groups="", + src_instance=s_ins, + src_db_name=src_db_name, + src_table_name=src_table_name, + dest_instance=d_ins, + dest_db_name=dest_db_name, + dest_table_name=dest_table_name, + condition=condition, + mode=mode, + no_delete=no_delete, + sleep=sleep, + status=WorkflowStatus.WAITING, + state=False, + user_name=user.username, + user_display=user.display, + ) + audit_handler = get_auditor( + workflow=archive_info, + resource_group=res_group.group_name, + resource_group_id=res_group.group_id, ) + + try: + audit_handler.create_audit() + except AuditException as e: + logger.error(f"新建审批流失败: {str(e)}") + return JsonResponse({"status": 1, "msg": "新建审批流失败, 请联系管理员", "data": {}}) async_task( notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=audit_detail, + workflow_audit=audit_handler.audit, timeout=60, - task_name=f"archive-apply-{archive_id}", + task_name=f"archive-apply-{audit_handler.workflow.id}", ) - return HttpResponse(json.dumps(result), content_type="application/json") + return JsonResponse( + { + "status": 0, + "msg": "", + "data": { + "workflow_status": audit_handler.audit.current_status, + "audit_id": audit_handler.audit.audit_id, + }, + } + ) @permission_required("sql.archive_review", raise_exception=True) @@ -229,51 +230,46 @@ def archive_audit(request): :return: """ # 获取用户信息 - user = request.user archive_id = int(request.POST["archive_id"]) - audit_status = int(request.POST["audit_status"]) + try: + audit_status = WorkflowAction(int(request.POST["audit_status"])) + except ValueError as e: + return render( + request, + "error.html", + {"errMsg": f"数据错误, 不允许的操作, 请检查 audit_status, error: {str(e)}"}, + ) audit_remark = request.POST.get("audit_remark") if audit_remark is None: audit_remark = "" + try: + archive_workflow = ArchiveConfig.objects.get(id=archive_id) + except ArchiveConfig.DoesNotExist: + return render(request, "error.html", {"errMsg": "工单不存在"}) - if Audit.can_review(request.user, archive_id, 3) is False: - context = {"errMsg": "你无权操作当前工单!"} - return render(request, "error.html", context) + resource_group = archive_workflow.resource_group + auditor = get_auditor(workflow=archive_workflow, resource_group=resource_group) # 使用事务保持数据一致性 - try: - with transaction.atomic(): - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=archive_id, - workflow_type=WorkflowType.ARCHIVE, - ) - audit_id = workflow_audit.audit_id - - # 调用工作流插入审核信息,更新业务表审核状态 - audit_status, workflow_audit_detail = Audit.audit( - audit_id, audit_status, user.username, audit_remark + with transaction.atomic(): + try: + workflow_audit_detail = auditor.operate( + audit_status, request.user, audit_remark ) - audit_status = audit_status["data"]["workflow_status"] - ArchiveConfig( - id=archive_id, - status=audit_status, - state=True if audit_status == WorkflowStatus.PASSED else False, - ).save(update_fields=["status", "state"]) - except Exception as msg: - logger.error(traceback.format_exc()) - context = {"errMsg": msg} - return render(request, "error.html", context) - else: - # 消息通知 - workflow_audit.refresh_from_db() - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=workflow_audit_detail, - timeout=60, - task_name=f"archive-audit-{archive_id}", - ) + except AuditException as e: + return render(request, "error.html", {"errMsg": f"审核失败: {str(e)}"}) + auditor.workflow.status = auditor.audit.current_status + if auditor.audit.current_status == WorkflowStatus.PASSED: + auditor.workflow.state = True + auditor.workflow.save() + async_task( + notify_for_audit, + workflow_audit=auditor.audit, + workflow_audit_detail=workflow_audit_detail, + timeout=60, + task_name=f"archive-audit-{archive_id}", + ) return HttpResponseRedirect(reverse("sql:archive_detail", args=(archive_id,))) diff --git a/sql/models.py b/sql/models.py index 20983a3fa8..fc01328c27 100755 --- a/sql/models.py +++ b/sql/models.py @@ -5,7 +5,7 @@ from django.utils.translation import gettext as _ from mirage.crypto import Crypto -from common.utils.const import WorkflowStatus, WorkflowType +from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction class ResourceGroup(models.Model): @@ -89,9 +89,6 @@ class TwoFactorAuthConfig(models.Model): ) user = models.ForeignKey(Users, on_delete=models.CASCADE) - def __int__(self): - return self.username - class Meta: managed = True db_table = "2fa_config" @@ -341,6 +338,8 @@ class Meta: class WorkflowAuditDetail(models.Model): """ 审批明细表 + TODO + 部分字段与 WorkflowLog 重复, 建议整合到一起) """ audit_detail_id = models.AutoField(primary_key=True) @@ -390,19 +389,10 @@ class WorkflowLog(models.Model): 工作流日志表 """ - operation_type_choices = ( - (0, "提交/待审核"), - (1, "审核通过"), - (2, "审核不通过"), - (3, "审核取消"), - (4, "定时执行"), - (5, "执行工单"), - (6, "执行结束"), - ) - id = models.AutoField(primary_key=True) audit_id = models.IntegerField("工单审批id", db_index=True) - operation_type = models.SmallIntegerField("操作类型", choices=operation_type_choices) + operation_type = models.SmallIntegerField("操作类型", choices=WorkflowAction.choices) + # operation_type_desc 字段实际无意义 operation_type_desc = models.CharField("操作类型描述", max_length=10) operation_info = models.CharField("操作信息", max_length=1000) operator = models.CharField("操作人", max_length=30) diff --git a/sql/query_privileges.py b/sql/query_privileges.py index f00aa5d55b..162de3f943 100644 --- a/sql/query_privileges.py +++ b/sql/query_privileges.py @@ -20,13 +20,13 @@ from django_q.tasks import async_task from common.config import SysConfig -from common.utils.const import WorkflowStatus, WorkflowType +from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction from common.utils.extend_json_encoder import ExtendJSONEncoder from sql.engines.goinception import GoInceptionEngine from sql.models import QueryPrivilegesApply, QueryPrivileges, Instance, ResourceGroup from sql.notify import notify_for_audit from sql.utils.resource_group import user_groups, user_instances -from sql.utils.workflow_audit import Audit +from sql.utils.workflow_audit import Audit, AuditException, get_auditor from sql.utils.sql_utils import extract_tables logger = logging.getLogger("default") @@ -259,55 +259,46 @@ def query_priv_apply(request): result["msg"] = f"你已拥有{instance_name}实例{db_name}.{tb_name}表的查询权限,不能重复申请" return HttpResponse(json.dumps(result), content_type="application/json") + apply_info = QueryPrivilegesApply( + title=title, + group_id=group_id, + group_name=group_name, + # audit_auth_groups 暂时设置为空 + audit_auth_groups="", + user_name=user.username, + user_display=user.display, + instance=ins, + priv_type=int(priv_type), + valid_date=valid_date, + status=WorkflowStatus.WAITING, + limit_num=limit_num, + ) + if int(priv_type) == 1: + apply_info.db_list = ",".join(db_list) + apply_info.table_list = "" + elif int(priv_type) == 2: + apply_info.db_list = db_name + apply_info.table_list = ",".join(table_list) + audit_handler = get_auditor(workflow=apply_info) # 使用事务保持数据一致性 try: with transaction.atomic(): - # 保存申请信息到数据库 - applyinfo = QueryPrivilegesApply( - title=title, - group_id=group_id, - group_name=group_name, - audit_auth_groups=Audit.settings(group_id, WorkflowType.QUERY), - user_name=user.username, - user_display=user.display, - instance=ins, - priv_type=int(priv_type), - valid_date=valid_date, - status=WorkflowStatus.WAITING, - limit_num=limit_num, - ) - if int(priv_type) == 1: - applyinfo.db_list = ",".join(db_list) - applyinfo.table_list = "" - elif int(priv_type) == 2: - applyinfo.db_list = db_name - applyinfo.table_list = ",".join(table_list) - applyinfo.save() - apply_id = applyinfo.apply_id - - # 调用工作流插入审核信息,查询权限申请workflow_type=1 - audit_result = Audit.add(WorkflowType.QUERY, apply_id) - if audit_result["status"] == 0: - # 更新业务表审核状态,判断是否插入权限信息 - _query_apply_audit_call_back( - apply_id, audit_result["data"]["workflow_status"] - ) - except Exception as msg: - logger.error(traceback.format_exc()) + audit_handler.create_audit() + except AuditException as e: + logger.error(f"新建审批流失败, {str(e)}") result["status"] = 1 - result["msg"] = str(msg) - else: - result = audit_result - # 消息通知 - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=apply_id, workflow_type=WorkflowType.QUERY - ) - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - timeout=60, - task_name=f"query-priv-apply-{apply_id}", - ) + result["msg"] = "新建审批流失败, 请联系管理员" + return HttpResponse(json.dumps(result), content_type="application/json") + _query_apply_audit_call_back( + audit_handler.workflow.apply_id, audit_handler.audit.current_status + ) + # 消息通知 + async_task( + notify_for_audit, + workflow_audit=audit_handler.audit, + timeout=60, + task_name=f"query-priv-apply-{audit_handler.workflow.apply_id}", + ) return HttpResponse(json.dumps(result), content_type="application/json") @@ -424,50 +415,42 @@ def query_priv_audit(request): # 获取用户信息 user = request.user apply_id = int(request.POST["apply_id"]) - audit_status = int(request.POST["audit_status"]) + try: + audit_status = WorkflowAction(int(request.POST["audit_status"])) + except ValueError as e: + return render(request, "error.html", {"errMsg": f"audit_status 参数错误, {str(e)}"}) audit_remark = request.POST.get("audit_remark") - if audit_remark is None: + if not audit_remark: audit_remark = "" - if Audit.can_review(request.user, apply_id, 1) is False: - context = {"errMsg": "你无权操作当前工单!"} - return render(request, "error.html", context) - - # 使用事务保持数据一致性 try: - with transaction.atomic(): - audit_id = Audit.detail_by_workflow_id( - workflow_id=apply_id, workflow_type=WorkflowType.QUERY - ).audit_id - - # 调用工作流接口审核 - audit_result, workflow_audit_detail = Audit.audit( - audit_id, audit_status, user.username, audit_remark + sql_query_apply = QueryPrivilegesApply.objects.get(apply_id=apply_id) + except QueryPrivilegesApply.DoesNotExist: + return render(request, "error.html", {"errMsg": "工单不存在"}) + auditor = get_auditor(workflow=sql_query_apply) + # 使用事务保持数据一致性 + with transaction.atomic(): + try: + workflow_audit_detail = auditor.operate( + audit_status, request.user, audit_remark + ) + except AuditException as e: + return render(request, "error.html", {"errMsg": f"审核失败: {str(e)}"}) + if auditor.audit.current_status == WorkflowStatus.PASSED: + # 通过了, 授权 + _query_apply_audit_call_back( + auditor.audit.workflow_id, auditor.audit.current_status ) - # 按照审核结果更新业务表审核状态 - audit_detail = Audit.detail(audit_id) - if audit_detail.workflow_type == WorkflowType.QUERY: - # 更新业务表审核状态,插入权限信息 - _query_apply_audit_call_back( - audit_detail.workflow_id, audit_result["data"]["workflow_status"] - ) - - except Exception as msg: - logger.error(traceback.format_exc()) - context = {"errMsg": msg} - return render(request, "error.html", context) - else: - # 消息通知 - audit_detail.refresh_from_db() - async_task( - notify_for_audit, - workflow_audit=audit_detail, - workflow_audit_detail=workflow_audit_detail, - timeout=60, - task_name=f"query-priv-audit-{apply_id}", - ) + # 消息通知 + async_task( + notify_for_audit, + workflow_audit=auditor.audit, + workflow_audit_detail=workflow_audit_detail, + timeout=60, + task_name=f"query-priv-audit-{apply_id}", + ) return HttpResponseRedirect(reverse("sql:queryapplydetail", args=(apply_id,))) @@ -496,6 +479,8 @@ def _db_priv(user, instance, db_name): :return: 权限存在则返回对应权限的limit_num,否则返回False TODO 返回统一为 int 类型, 不存在返回0 (虽然其实在python中 0==False) """ + if user.is_superuser: + return int(SysConfig().get("admin_query_limit", 5000)) # 获取用户库权限 user_privileges = QueryPrivileges.objects.filter( user_name=user.username, @@ -505,11 +490,8 @@ def _db_priv(user, instance, db_name): is_deleted=0, priv_type=1, ) - if user.is_superuser: - return int(SysConfig().get("admin_query_limit", 5000)) - else: - if user_privileges.exists(): - return user_privileges.first().limit_num + if user_privileges.exists(): + return user_privileges.first().limit_num return False diff --git a/sql/sql_workflow.py b/sql/sql_workflow.py index a05e8052e9..584602aced 100644 --- a/sql/sql_workflow.py +++ b/sql/sql_workflow.py @@ -14,7 +14,7 @@ from django_q.tasks import async_task from common.config import SysConfig -from common.utils.const import WorkflowStatus, WorkflowType +from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction from common.utils.extend_json_encoder import ExtendJSONEncoder from sql.engines import get_engine from sql.engines.models import ReviewResult, ReviewSet @@ -29,7 +29,7 @@ can_rollback, ) from sql.utils.tasks import add_sql_schedule, del_schedule -from sql.utils.workflow_audit import Audit +from sql.utils.workflow_audit import Audit, get_auditor, AuditException from .models import SqlWorkflow logger = logging.getLogger("default") @@ -238,54 +238,40 @@ def passed(request): if workflow_id == 0: context = {"errMsg": "workflow_id参数为空."} return render(request, "error.html", context) + try: + sql_workflow = SqlWorkflow.objects.get(id=workflow_id) + except SqlWorkflow.DoesNotExist: + return render(request, "error.html", {"errMsg": "工单不存在"}) - user = request.user - if Audit.can_review(user, workflow_id, 2) is False: - context = {"errMsg": "你无权操作当前工单!"} - return render(request, "error.html", context) - + sys_config = SysConfig() + auditor = get_auditor(workflow=sql_workflow, sys_config=sys_config) # 使用事务保持数据一致性 - try: - with transaction.atomic(): - # 调用工作流接口审核 - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowType.SQL_REVIEW, - ) - audit_id = workflow_audit.audit_id - audit_result, audit_detail = Audit.audit( - audit_id, - WorkflowStatus.PASSED, - user.username, - audit_remark, + with transaction.atomic(): + try: + workflow_audit_detail = auditor.operate( + WorkflowAction.PASS, request.user, audit_remark ) - - # 按照审核结果更新业务表审核状态 - if audit_result["data"]["workflow_status"] == WorkflowStatus.PASSED: - # 将流程状态修改为审核通过 - SqlWorkflow(id=workflow_id, status="workflow_review_pass").save( - update_fields=["status"] - ) - except Exception as msg: - logger.error(f"审核工单报错,错误信息:{traceback.format_exc()}") - context = {"errMsg": msg} - return render(request, "error.html", context) - else: - # 开启了Pass阶段通知参数才发送消息通知 - sys_config = SysConfig() - is_notified = ( - "Pass" in sys_config.get("notify_phase_control").split(",") - if sys_config.get("notify_phase_control") - else True + except AuditException as e: + return render(request, "error.html", {"errMsg": f"审核失败, 错误信息: {str(e)}"}) + if auditor.audit.current_status == WorkflowStatus.PASSED: + # 审批流全部走完了, 把工单标记为审核通过 + auditor.workflow.status = "workflow_review_pass" + auditor.workflow.save() + + # 开启了Pass阶段通知参数才发送消息通知 + is_notified = ( + "Pass" in sys_config.get("notify_phase_control").split(",") + if sys_config.get("notify_phase_control") + else True + ) + if is_notified: + async_task( + notify_for_audit, + workflow_audit=auditor.audit, + workflow_audit_detail=workflow_audit_detail, + timeout=60, + task_name=f"sqlreview-pass-{workflow_id}", ) - if is_notified: - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=audit_detail, - timeout=60, - task_name=f"sqlreview-pass-{workflow_id}", - ) return HttpResponseRedirect(reverse("sql:detail", args=(workflow_id,))) @@ -460,86 +446,40 @@ def cancel(request): return render(request, "error.html", context) # 使用事务保持数据一致性 - try: - with transaction.atomic(): - # 调用工作流接口取消或者驳回 - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowType.SQL_REVIEW, - ) - audit_id = workflow_audit.audit_id - # 仅待审核的需要调用工作流,审核通过的不需要 - if sql_workflow.status != "workflow_manreviewing": - # 增加工单日志 - if user.username == sql_workflow.engineer: - Audit.add_log( - audit_id=audit_id, - operation_type=3, - operation_type_desc="取消执行", - operation_info="取消原因:{}".format(audit_remark), - operator=request.user.username, - operator_display=request.user.display, - ) - else: - Audit.add_log( - audit_id=audit_id, - operation_type=2, - operation_type_desc="审批不通过", - operation_info="审批备注:{}".format(audit_remark), - operator=request.user.username, - operator_display=request.user.display, - ) - else: - if user.username == sql_workflow.engineer: - _, workflow_audit_detail = Audit.audit( - audit_id, - WorkflowStatus.ABORTED, - user.username, - audit_remark, - ) - # 非提交人需要校验审核权限 - elif user.has_perm("sql.sql_review"): - _, workflow_audit_detail = Audit.audit( - audit_id, - WorkflowStatus.REJECTED, - user.username, - audit_remark, - ) - else: - raise PermissionDenied - - # 删除定时执行task - if sql_workflow.status == "workflow_timingtask": - schedule_name = f"sqlreview-timing-{workflow_id}" - del_schedule(schedule_name) - # 将流程状态修改为人工终止流程 - sql_workflow.status = "workflow_abort" - sql_workflow.save() - except Exception as msg: - logger.error(f"取消工单报错,错误信息:{traceback.format_exc()}") - context = {"errMsg": msg} - return render(request, "error.html", context) + if user.username == sql_workflow.engineer: + action = WorkflowAction.ABORT + elif user.has_perm("sql.sql_review"): + action = WorkflowAction.REJECT else: - # 发送取消、驳回通知,开启了Cancel阶段通知参数才发送消息通知 - sys_config = SysConfig() - is_notified = ( - "Cancel" in sys_config.get("notify_phase_control").split(",") - if sys_config.get("notify_phase_control") - else True + raise PermissionDenied + with transaction.atomic(): + auditor = get_auditor(workflow=sql_workflow) + try: + workflow_audit_detail = auditor.operate(action, request.user, audit_remark) + except AuditException as e: + logger.error(f"取消工单报错,错误信息:{traceback.format_exc()}") + return render(request, "error.html", {"errMsg": f"{str(e)}"}) + # 将流程状态修改为人工终止流程 + sql_workflow.status = "workflow_abort" + sql_workflow.save() + # 删除定时执行task + if sql_workflow.status == "workflow_timingtask": + del_schedule(f"sqlreview-timing-{workflow_id}") + # 发送取消、驳回通知,开启了Cancel阶段通知参数才发送消息通知 + sys_config = SysConfig() + is_notified = ( + "Cancel" in sys_config.get("notify_phase_control").split(",") + if sys_config.get("notify_phase_control") + else True + ) + if is_notified: + async_task( + notify_for_audit, + workflow_audit=auditor.audit, + workflow_audit_detail=workflow_audit_detail, + timeout=60, + task_name=f"sqlreview-cancel-{workflow_id}", ) - if is_notified: - workflow_audit.refresh_from_db() - if workflow_audit.current_status in ( - WorkflowStatus.ABORTED, - WorkflowStatus.REJECTED, - ): - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=workflow_audit_detail, - timeout=60, - task_name=f"sqlreview-cancel-{workflow_id}", - ) return HttpResponseRedirect(reverse("sql:detail", args=(workflow_id,))) diff --git a/sql/test_model.py b/sql/test_model.py new file mode 100644 index 0000000000..c2b9d1dc8a --- /dev/null +++ b/sql/test_model.py @@ -0,0 +1,9 @@ +"""models.py 的补充测试""" + +from sql.models import InstanceTag + + +def test_instance_tag_str(): + i = InstanceTag(tag_name="test") + + assert str(i) == "test" diff --git a/sql/tests.py b/sql/tests.py index 5fa7bc25f7..9777acac36 100644 --- a/sql/tests.py +++ b/sql/tests.py @@ -9,7 +9,7 @@ import sql.query_privileges from common.config import SysConfig -from common.utils.const import WorkflowStatus +from common.utils.const import WorkflowStatus, WorkflowType from sql.archiver import add_archive_task, archive from sql.binlog import my2sql_file from sql.engines.models import ResultSet @@ -29,7 +29,9 @@ WorkflowLog, WorkflowAuditSetting, ArchiveConfig, + WorkflowAuditDetail, ) +from sql.utils.workflow_audit import AuditException User = Users @@ -1418,6 +1420,19 @@ def setUp(self): db_name="some_db", syntax_type=1, ) + self.audit_flow = WorkflowAudit.objects.create( + group_id=1, + group_name="g1", + workflow_id=self.wf2.id, + workflow_type=WorkflowType.SQL_REVIEW, + workflow_title="123", + audit_auth_groups="123", + current_audit="", + next_audit="", + current_status=WorkflowStatus.WAITING, + create_user="", + create_user_display="", + ) self.wfc2 = SqlWorkflowContent.objects.create( workflow=self.wf2, sql_content="some_sql", @@ -1583,36 +1598,32 @@ def testWorkflowListViewFilter(self): self.assertEqual(r_json["total"], 2) @patch("sql.notify.auto_notify") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - @patch("sql.utils.workflow_audit.Audit.audit") - @patch("sql.utils.workflow_audit.Audit.can_review") - def testWorkflowPassedView(self, _can_review, _audit, _detail_by_id, _): + @patch("sql.utils.workflow_audit.AuditV2.operate") + def testWorkflowPassedView(self, mock_operate, _): """测试审核工单""" c = Client() c.force_login(self.superuser1) r = c.post("/passed/") self.assertContains(r, "workflow_id参数为空.") - _can_review.return_value = False - r = c.post("/passed/", {"workflow_id": self.wf1.id}) - self.assertContains(r, "你无权操作当前工单!") - _can_review.return_value = True - mock_audit_detail = PickableMock() - mock_audit_detail.audit_id = 123 - _detail_by_id.return_value = mock_audit_detail - _audit.return_value = ( - {"data": {"workflow_status": 1}}, - {"foo": "bar"}, - ) # TODO 改为audit_success + mock_operate.side_effect = AuditException("mock audit failed") + r = c.post("/passed/", {"workflow_id": self.wf2.id}) + self.assertContains(r, "mock audit failed") + mock_operate.reset_mock(side_effect=True) + mock_operate.return_value = None + # 因为 operate 被 mock 了, 为了测试审批流通过, 这里把审批流手动设置为通过, 仅 测试 view 层的逻辑 + # audit operate 的测试由其他测试覆盖 + self.audit_flow.current_status = WorkflowStatus.PASSED + self.audit_flow.save() r = c.post( "/passed/", - data={"workflow_id": self.wf1.id, "audit_remark": "some_audit"}, + data={"workflow_id": self.wf2.id, "audit_remark": "some_audit"}, follow=False, ) self.assertRedirects( - r, "/detail/{}/".format(self.wf1.id), fetch_redirect_response=False + r, "/detail/{}/".format(self.wf2.id), fetch_redirect_response=False ) - self.wf1.refresh_from_db() - self.assertEqual(self.wf1.status, "workflow_review_pass") + self.wf2.refresh_from_db() + self.assertEqual(self.wf2.status, "workflow_review_pass") @patch("sql.sql_workflow.notify_for_execute") @patch("sql.sql_workflow.Audit.add_log") @@ -1633,13 +1644,15 @@ def test_workflow_execute(self, mock_can_excute, _, _1, _2): self.assertEqual("workflow_finish", self.wf2.status) @patch("sql.sql_workflow.Audit.add_log") - @patch("sql.sql_workflow.Audit.detail_by_workflow_id") - @patch("sql.sql_workflow.Audit.audit") + @patch("sql.notify.auto_notify") + @patch("sql.utils.workflow_audit.AuditV2.operate") # patch view里的can_cancel 而不是原始位置的can_cancel ,因为在调用时, 已经 import 了真的 can_cancel ,会导致mock失效 # 在import 静态函数时需要注意这一点, 动态对象因为每次都会重新生成,也可以 mock 原函数/方法/对象 # 参见 : https://docs.python.org/3/library/unittest.mock.html#where-to-patch @patch("sql.sql_workflow.can_cancel") - def testWorkflowCancelView(self, _can_cancel, _audit, _detail_by_id, _add_log): + def testWorkflowCancelView( + self, _can_cancel, mock_audit_operate, mock_notify, _add_log + ): """测试工单驳回、取消""" c = Client() c.force_login(self.u2) @@ -1648,6 +1661,7 @@ def testWorkflowCancelView(self, _can_cancel, _audit, _detail_by_id, _add_log): r = c.post("/cancel/", data={"workflow_id": self.wf2.id}) self.assertContains(r, "终止原因不能为空") _can_cancel.return_value = False + mock_audit_operate.return_value = None r = c.post( "/cancel/", data={"workflow_id": self.wf2.id, "cancel_remark": "some_reason"}, @@ -1655,7 +1669,6 @@ def testWorkflowCancelView(self, _can_cancel, _audit, _detail_by_id, _add_log): self.assertContains(r, "你无权操作当前工单!") _can_cancel.return_value = True _detail_by_id = 123 - _audit.return_value = (None, None) c.post( "/cancel/", data={"workflow_id": self.wf2.id, "cancel_remark": "some_reason"}, @@ -1951,6 +1964,19 @@ def setUp(self): user_name="some_user", user_display="display", ) + self.audit_flow = WorkflowAudit.objects.create( + group_id=1, + group_name="g1", + workflow_id=self.archive_apply.id, + workflow_type=WorkflowType.ARCHIVE, + workflow_title="123", + audit_auth_groups="123", + current_audit="", + next_audit="", + current_status=WorkflowStatus.WAITING, + create_user="", + create_user_display="", + ) self.sys_config = SysConfig() self.client = Client() @@ -2060,7 +2086,8 @@ def test_archive_apply_not_exist_review(self): self.client.force_login(self.superuser) r = self.client.post(path="/archive/apply/", data=data) self.assertDictEqual( - json.loads(r.content), {"data": {}, "msg": "审批流程不能为空,请先配置审批流程", "status": 1} + json.loads(r.content), + {"data": {}, "msg": "新建审批流失败, 请联系管理员", "status": 1}, ) @patch("sql.archiver.async_task") @@ -2090,32 +2117,30 @@ def test_archive_apply(self, _async_task): r = self.client.post(path="/archive/apply/", data=data) self.assertEqual(json.loads(r.content)["status"], 0) - @patch("sql.archiver.Audit") + @patch("sql.utils.workflow_audit.AuditV2.operate") @patch("sql.archiver.async_task") - def test_archive_audit(self, _async_task, _audit): + def test_archive_audit(self, _async_task, mock_operate): """ 测试审核归档实例数据 :return: """ - _audit.detail_by_workflow_id.return_value.audit_id = 1 - _audit.audit.return_value = ( - { - "status": 0, - "msg": "ok", - "data": {"workflow_status": 1}, - }, - None, - ) + mock_operate.return_value = None data = { "archive_id": self.archive_apply.id, "audit_status": WorkflowStatus.PASSED, "audit_remark": "xxxx", } + # operate 被 patch 了, 这里强制设置一下, 走一下流程 + self.audit_flow.current_status = WorkflowStatus.PASSED + self.audit_flow.save() self.client.force_login(self.superuser) r = self.client.post(path="/archive/audit/", data=data) self.assertRedirects( r, f"/archive/{self.archive_apply.id}/", fetch_redirect_response=False ) + self.archive_apply.refresh_from_db() + assert self.archive_apply.state == True + assert self.archive_apply.status == WorkflowStatus.PASSED @patch("sql.archiver.async_task") def test_add_archive_task(self, _async_task): @@ -3116,7 +3141,7 @@ def dummy(s): self.assertEqual(r.json()["status"], 1) @patch("sql.data_dictionary.get_engine") - def oracle_test_export_instance(self, _get_engine): + def test_oracle_export_instance(self, _get_engine): """ oracle元数据测试导出 :return: diff --git a/sql/utils/test_workflow_audit.py b/sql/utils/test_workflow_audit.py new file mode 100644 index 0000000000..21176e10cf --- /dev/null +++ b/sql/utils/test_workflow_audit.py @@ -0,0 +1,510 @@ +import datetime +from unittest.mock import patch + +import pytest +from pytest_mock import MockFixture +from django.contrib.auth.models import Permission, Group +from django.test import TestCase + +from common.config import SysConfig +from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction +from sql.models import ( + Instance, + ResourceGroup, + SqlWorkflow, + SqlWorkflowContent, + QueryPrivilegesApply, + ArchiveConfig, + WorkflowAudit, + WorkflowLog, + WorkflowAuditDetail, + WorkflowAuditSetting, +) +from sql.utils.tests import User +from sql.utils.workflow_audit import Audit, AuditV2, AuditSetting, AuditException + + +class TestAudit(TestCase): + def setUp(self): + self.sys_config = SysConfig() + self.user = User.objects.create( + username="test_user", display="中文显示", is_active=True + ) + self.su = User.objects.create( + username="s_user", display="中文显示", is_active=True, is_superuser=True + ) + tomorrow = datetime.datetime.today() + datetime.timedelta(days=1) + self.ins = Instance.objects.create( + instance_name="some_ins", + type="slave", + db_type="mysql", + host="some_host", + port=3306, + user="ins_user", + password="some_str", + ) + self.res_group = ResourceGroup.objects.create( + group_id=1, group_name="group_name" + ) + self.wf = SqlWorkflow.objects.create( + workflow_name="some_name", + group_id=1, + group_name="g1", + engineer_display="", + audit_auth_groups="some_audit_group", + create_time=datetime.datetime.now(), + status="workflow_timingtask", + is_backup=True, + instance=self.ins, + db_name="some_db", + syntax_type=1, + ) + self.own_wf = SqlWorkflow.objects.create( + workflow_name="some_name", + group_id=1, + group_name="g1", + engineer=self.user.username, + audit_auth_groups="some_audit_group", + create_time=datetime.datetime.now(), + status="workflow_timingtask", + is_backup=True, + instance=self.ins, + db_name="some_db", + syntax_type=1, + ) + SqlWorkflowContent.objects.create( + workflow=self.wf, sql_content="some_sql", execute_result="" + ) + self.query_apply_1 = QueryPrivilegesApply.objects.create( + group_id=1, + group_name="some_name", + title="some_title1", + user_name="some_user", + instance=self.ins, + db_list="some_db,some_db2", + limit_num=100, + valid_date=tomorrow, + priv_type=1, + status=0, + audit_auth_groups="some_audit_group", + ) + self.archive_apply_1 = ArchiveConfig.objects.create( + title="title", + resource_group=self.res_group, + audit_auth_groups="some_audit_group", + src_instance=self.ins, + src_db_name="src_db_name", + src_table_name="src_table_name", + dest_instance=self.ins, + dest_db_name="src_db_name", + dest_table_name="src_table_name", + condition="1=1", + mode="file", + no_delete=True, + sleep=1, + status=WorkflowStatus.WAITING, + state=False, + user_name="some_user", + user_display="display", + ) + self.audit = WorkflowAudit.objects.create( + group_id=1, + group_name="some_group", + workflow_id=1, + workflow_type=1, + workflow_title="申请标题", + workflow_remark="申请备注", + audit_auth_groups="1,2,3", + current_audit="1", + next_audit="2", + current_status=0, + ) + self.wl = WorkflowLog.objects.create( + audit_id=self.audit.audit_id, operation_type=1 + ) + + def tearDown(self): + self.sys_config.purge() + User.objects.all().delete() + SqlWorkflow.objects.all().delete() + SqlWorkflowContent.objects.all().delete() + WorkflowAudit.objects.all().delete() + WorkflowAuditDetail.objects.all().delete() + WorkflowAuditSetting.objects.all().delete() + QueryPrivilegesApply.objects.all().delete() + WorkflowLog.objects.all().delete() + ResourceGroup.objects.all().delete() + ArchiveConfig.objects.all().delete() + + @patch("sql.utils.workflow_audit.user_groups", return_value=[]) + def test_todo(self, _user_groups): + """TODO 测试todo数量,未断言""" + Audit.todo(self.user) + Audit.todo(self.su) + + def test_detail(self): + """测试获取审核信息""" + result = Audit.detail(self.audit.audit_id) + self.assertEqual(result, self.audit) + result = Audit.detail(0) + self.assertEqual(result, None) + + def test_detail_by_workflow_id(self): + """测试通过业务id获取审核信息""" + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.save() + result = Audit.detail_by_workflow_id(self.wf.id, WorkflowType.SQL_REVIEW) + self.assertEqual(result, self.audit) + result = Audit.detail_by_workflow_id(0, 0) + self.assertEqual(result, None) + + def test_settings(self): + """测试通过组和审核类型,获取审核配置信息""" + WorkflowAuditSetting.objects.create( + workflow_type=1, group_id=1, audit_auth_groups="1,2,3" + ) + result = Audit.settings(workflow_type=1, group_id=1) + self.assertEqual(result, "1,2,3") + result = Audit.settings(0, 0) + self.assertEqual(result, None) + + def test_change_settings_edit(self): + """修改配置信息""" + ws = WorkflowAuditSetting.objects.create( + workflow_type=1, group_id=1, audit_auth_groups="1,2,3" + ) + Audit.change_settings(workflow_type=1, group_id=1, audit_auth_groups="1,2") + ws = WorkflowAuditSetting.objects.get(audit_setting_id=ws.audit_setting_id) + self.assertEqual(ws.audit_auth_groups, "1,2") + + def test_change_settings_add(self): + """添加配置信息""" + Audit.change_settings(workflow_type=1, group_id=1, audit_auth_groups="1,2") + ws = WorkflowAuditSetting.objects.get(workflow_type=1, group_id=1) + self.assertEqual(ws.audit_auth_groups, "1,2") + + @patch("sql.utils.workflow_audit.auth_group_users") + @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") + def test_can_review_sql_review(self, _detail_by_workflow_id, _auth_group_users): + """测试判断用户当前是否是可审核上线工单,非管理员拥有权限""" + sql_review = Permission.objects.get(codename="sql_review") + self.user.user_permissions.add(sql_review) + aug = Group.objects.create(name="auth_group") + _detail_by_workflow_id.return_value.current_audit = aug.id + _auth_group_users.return_value.filter.exists = True + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.save() + r = Audit.can_review( + self.user, self.audit.workflow_id, self.audit.workflow_type + ) + self.assertEqual(r, True) + + @patch("sql.utils.workflow_audit.auth_group_users") + @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") + def test_cannot_review_self_sql_review( + self, _detail_by_workflow_id, _auth_group_users + ): + """测试确认用户不能审核自己提交的上线工单,非管理员拥有权限""" + self.sys_config.set("ban_self_audit", "true") + sql_review = Permission.objects.get(codename="sql_review") + self.user.user_permissions.add(sql_review) + aug = Group.objects.create(name="auth_group") + _detail_by_workflow_id.return_value.current_audit = aug.id + _auth_group_users.return_value.filter.exists = True + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.own_wf.id + self.audit.save() + r = Audit.can_review( + self.user, self.audit.workflow_id, self.audit.workflow_type + ) + self.assertEqual(r, False) + + @patch("sql.utils.workflow_audit.auth_group_users") + @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") + def test_can_review_query_review(self, _detail_by_workflow_id, _auth_group_users): + """测试判断用户当前是否是可审核查询工单,非管理员拥有权限""" + query_review = Permission.objects.get(codename="query_review") + self.user.user_permissions.add(query_review) + aug = Group.objects.create(name="auth_group") + _detail_by_workflow_id.return_value.current_audit = aug.id + _auth_group_users.return_value.filter.exists = True + self.audit.workflow_type = WorkflowType.QUERY + self.audit.workflow_id = self.query_apply_1.apply_id + self.audit.save() + r = Audit.can_review( + self.user, self.audit.workflow_id, self.audit.workflow_type + ) + self.assertEqual(r, True) + + @patch("sql.utils.workflow_audit.auth_group_users") + @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") + def test_can_review_sql_review_super( + self, _detail_by_workflow_id, _auth_group_users + ): + """测试判断用户当前是否是可审核查询工单,用户是管理员""" + aug = Group.objects.create(name="auth_group") + _detail_by_workflow_id.return_value.current_audit = aug.id + _auth_group_users.return_value.filter.exists = True + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.save() + r = Audit.can_review(self.su, self.audit.workflow_id, self.audit.workflow_type) + self.assertEqual(r, True) + + @patch("sql.utils.workflow_audit.auth_group_users") + @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") + def test_can_review_wrong_status(self, _detail_by_workflow_id, _auth_group_users): + """测试判断用户当前是否是可审核,非待审核工单""" + aug = Group.objects.create(name="auth_group") + _detail_by_workflow_id.return_value.current_audit = aug.id + _auth_group_users.return_value.filter.exists = True + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.current_status = WorkflowStatus.PASSED + self.audit.save() + r = Audit.can_review( + self.user, self.audit.workflow_id, self.audit.workflow_type + ) + self.assertEqual(r, False) + + @patch("sql.utils.workflow_audit.auth_group_users") + @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") + def test_can_review_no_prem(self, _detail_by_workflow_id, _auth_group_users): + """测试判断用户当前是否是可审核,普通用户无权限""" + aug = Group.objects.create(name="auth_group") + _detail_by_workflow_id.return_value.current_audit = aug.id + _auth_group_users.return_value.filter.exists = True + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.save() + r = Audit.can_review( + self.user, self.audit.workflow_id, self.audit.workflow_type + ) + self.assertEqual(r, False) + + @patch("sql.utils.workflow_audit.auth_group_users") + @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") + def test_can_review_no_prem_exception( + self, _detail_by_workflow_id, _auth_group_users + ): + """测试判断用户当前是否是可审核,权限组不存在""" + Group.objects.create(name="auth_group") + _detail_by_workflow_id.side_effect = RuntimeError() + _auth_group_users.return_value.filter.exists = True + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.save() + with self.assertRaisesMessage(Exception, "当前审批auth_group_id不存在,请检查并清洗历史数据"): + Audit.can_review( + self.user, self.audit.workflow_id, self.audit.workflow_type + ) + + def test_review_info_no_review(self): + """测试获取当前工单审批流程和当前审核组,无需审批""" + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.audit_auth_groups = "" + self.audit.current_audit = "-1" + self.audit.save() + audit_auth_group, current_audit_auth_group = Audit.review_info( + self.audit.workflow_id, self.audit.workflow_type + ) + self.assertEqual(audit_auth_group, "无需审批") + self.assertEqual(current_audit_auth_group, None) + + def test_review_info(self): + """测试获取当前工单审批流程和当前审核组,无需审批""" + aug = Group.objects.create(name="DBA") + self.audit.workflow_type = WorkflowType.SQL_REVIEW + self.audit.workflow_id = self.wf.id + self.audit.audit_auth_groups = str(aug.id) + self.audit.current_audit = str(aug.id) + self.audit.save() + audit_auth_group, current_audit_auth_group = Audit.review_info( + self.audit.workflow_id, self.audit.workflow_type + ) + self.assertEqual(audit_auth_group, "DBA") + self.assertEqual(current_audit_auth_group, "DBA") + + def test_logs(self): + """测试获取工单日志""" + r = Audit.logs(self.audit.audit_id).first() + self.assertEqual(r, self.wl) + + +# AuditV2 测试 +def test_create_audit( + sql_workflow, sql_query_apply, archive_apply, resource_group, mocker: MockFixture +): + """测试正常创建, 可正常获取到一个 audit_setting""" + mock_generate_audit_setting = mocker.patch.object(AuditV2, "generate_audit_setting") + fake_audit_setting = AuditSetting( + auto_pass=False, + audit_auth_groups=[123], + ) + mock_generate_audit_setting.return_value = fake_audit_setting + + workflow, workflow_content = sql_workflow + audit = AuditV2(workflow=workflow) + audit.create_audit() + workflow.refresh_from_db() + assert workflow.audit_auth_groups == fake_audit_setting.audit_auth_group_in_db + + audit = AuditV2(workflow=sql_query_apply) + audit.create_audit() + sql_query_apply.refresh_from_db() + assert ( + sql_query_apply.audit_auth_groups == fake_audit_setting.audit_auth_group_in_db + ) + + audit = AuditV2( + workflow=archive_apply, + resource_group=resource_group.group_name, + resource_group_id=resource_group.group_id, + ) + audit.create_audit() + archive_apply.refresh_from_db() + assert archive_apply.audit_auth_groups == fake_audit_setting.audit_auth_group_in_db + + +def test_init_no_workflow_and_audit(): + with pytest.raises(ValueError) as e: + AuditV2() + assert "WorkflowAudit 或 workflow" in str(e.value) + + +def test_archive_init_no_resource_group(archive_apply): + """测试 archive 初始化时指定的资源组不存在""" + with pytest.raises(AuditException) as e: + AuditV2(workflow=archive_apply, resource_group="not_exists_group") + assert "参数错误, 未发现资源组" in str(e.value) + + +def test_duplicate_create(sql_query_apply, fake_generate_audit_setting): + audit = AuditV2(workflow=sql_query_apply) + audit.create_audit() + with pytest.raises(AuditException) as e: + audit.create_audit() + assert "请勿重复提交" in str(e.value) + + +def test_create_audit_auto_pass(sql_workflow, mocker: MockFixture): + workflow, workflow_content = sql_workflow + mock_generate_audit_setting = mocker.patch.object(AuditV2, "generate_audit_setting") + fake_audit_setting = AuditSetting( + auto_pass=True, + audit_auth_groups=[], + ) + mock_generate_audit_setting.return_value = fake_audit_setting + audit = AuditV2(workflow=workflow) + audit.create_audit() + assert audit.audit.current_status == WorkflowStatus.PASSED + + +@pytest.fixture +def fake_generate_audit_setting(mocker: MockFixture): + mock_generate_audit_setting = mocker.patch.object(AuditV2, "generate_audit_setting") + fake_audit_setting = AuditSetting( + auto_pass=False, + audit_auth_groups=[123], + ) + mock_generate_audit_setting.return_value = fake_audit_setting + yield mock_generate_audit_setting + + +@pytest.mark.parametrize( + "status,operation,allowed", + [ + (WorkflowStatus.WAITING, WorkflowAction.SUBMIT, False), + (WorkflowStatus.WAITING, WorkflowAction.PASS, True), + (WorkflowStatus.WAITING, WorkflowAction.REJECT, True), + (WorkflowStatus.WAITING, WorkflowAction.EXECUTE_START, False), + (WorkflowStatus.PASSED, WorkflowAction.REJECT, True), + (WorkflowStatus.PASSED, WorkflowAction.PASS, False), + (WorkflowStatus.REJECTED, WorkflowAction.PASS, False), + (WorkflowStatus.ABORTED, WorkflowAction.PASS, False), + ], +) +def test_supported_operate( + sql_query_apply, + status, + super_user, + operation, + allowed: bool, + fake_generate_audit_setting, +): + audit = AuditV2(workflow=sql_query_apply) + audit.create_audit() + audit.audit.current_status = status + audit.audit.save() + if not allowed: + with pytest.raises(AuditException) as e: + audit.operate(operation, super_user, "test") + assert "不允许的操作" in str(e.value) + else: + result = audit.operate(operation, super_user, "test") + assert isinstance(result, WorkflowAuditDetail) + assert result.audit_id == audit.audit.audit_id + # 在 log 表里找对于的记录 + log = WorkflowLog.objects.filter( + audit_id=audit.audit.audit_id, operation_type=operation + ).all() + assert len(log) == 1 + + +def test_pass_has_next_level(sql_query_apply, super_user, fake_generate_audit_setting): + fake_generate_audit_setting.return_value = AuditSetting( + auto_pass=False, + audit_auth_groups=[1, 2], + ) + audit = AuditV2(workflow=sql_query_apply) + audit.create_audit() + audit.operate(WorkflowAction.PASS, super_user, "ok") + assert audit.audit.current_status == WorkflowStatus.WAITING + assert audit.audit.current_audit == 2 + assert audit.audit.next_audit == "-1" + + +def test_generate_audit_setting_empty_config(sql_query_apply): + audit = AuditV2(workflow=sql_query_apply) + with pytest.raises(AuditException) as e: + audit.generate_audit_setting() + assert "未配置审流" in str(e.value) + + +def test_generate_audit_setting_auto_review( + sql_workflow, setup_sys_config, mocker: MockFixture +): + sql_workflow, _ = sql_workflow + setup_sys_config.set("auto_review", True) + mock_is_auto_review = mocker.patch( + "sql.utils.workflow_audit.is_auto_review", return_value=True + ) + audit = AuditV2(workflow=sql_workflow, sys_config=setup_sys_config) + audit_setting = audit.generate_audit_setting() + assert audit_setting.auto_pass is True + mock_is_auto_review.assert_called() + + +def test_get_workflow( + archive_apply, + sql_query_apply, + sql_workflow, + resource_group, + fake_generate_audit_setting, +): + """初始化时只传了 audit, 尝试取 workflow""" + sql_workflow, _ = sql_workflow + for wf in [sql_query_apply, sql_workflow]: + a = AuditV2(workflow=wf) + a.create_audit() + audit_init_with_audit = AuditV2(audit=a.audit) + assert audit_init_with_audit.workflow_type == a.workflow_type + assert audit_init_with_audit.workflow == a.workflow + a = AuditV2(workflow=archive_apply, resource_group=resource_group.group_name) + a.create_audit() + audit_init_with_audit = AuditV2(audit=a.audit) + assert audit_init_with_audit.workflow_type == a.workflow_type + assert audit_init_with_audit.workflow == a.workflow diff --git a/sql/utils/tests.py b/sql/utils/tests.py index 4fa4edfebc..5e490dad18 100644 --- a/sql/utils/tests.py +++ b/sql/utils/tests.py @@ -6,11 +6,6 @@ @time: 2019/03/14 """ -import os -import sys -import os -import django - import datetime import json from unittest.mock import patch, MagicMock @@ -21,8 +16,7 @@ from django_q.models import Schedule from common.config import SysConfig -from common.utils.const import WorkflowStatus, WorkflowType -from sql.engines.models import ReviewResult, ReviewSet, SqlItem +from sql.engines.models import ReviewResult, ReviewSet from sql.models import ( Users, SqlWorkflow, @@ -30,14 +24,9 @@ Instance, ResourceGroup, WorkflowLog, - WorkflowAudit, - WorkflowAuditDetail, - WorkflowAuditSetting, - QueryPrivilegesApply, DataMaskingRules, DataMaskingColumns, InstanceTag, - ArchiveConfig, ) from sql.utils.resource_group import user_groups, user_instances, auth_group_users from sql.utils.sql_review import ( @@ -50,7 +39,6 @@ from sql.utils.sql_utils import * from sql.utils.execute_sql import execute, execute_callback from sql.utils.tasks import add_sql_schedule, del_schedule, task_info -from sql.utils.workflow_audit import Audit from sql.utils.data_masking import data_masking, brute_mask, simple_column_mask User = Users @@ -1065,557 +1053,6 @@ def test_task_info_not_exists(self): Schedule.objects.get(name="some_name1") -class TestAudit(TestCase): - def setUp(self): - self.sys_config = SysConfig() - self.user = User.objects.create( - username="test_user", display="中文显示", is_active=True - ) - self.su = User.objects.create( - username="s_user", display="中文显示", is_active=True, is_superuser=True - ) - tomorrow = datetime.datetime.today() + datetime.timedelta(days=1) - self.ins = Instance.objects.create( - instance_name="some_ins", - type="slave", - db_type="mysql", - host="some_host", - port=3306, - user="ins_user", - password="some_str", - ) - self.res_group = ResourceGroup.objects.create( - group_id=1, group_name="group_name" - ) - self.wf = SqlWorkflow.objects.create( - workflow_name="some_name", - group_id=1, - group_name="g1", - engineer_display="", - audit_auth_groups="some_audit_group", - create_time=datetime.datetime.now(), - status="workflow_timingtask", - is_backup=True, - instance=self.ins, - db_name="some_db", - syntax_type=1, - ) - self.own_wf = SqlWorkflow.objects.create( - workflow_name="some_name", - group_id=1, - group_name="g1", - engineer=self.user.username, - audit_auth_groups="some_audit_group", - create_time=datetime.datetime.now(), - status="workflow_timingtask", - is_backup=True, - instance=self.ins, - db_name="some_db", - syntax_type=1, - ) - SqlWorkflowContent.objects.create( - workflow=self.wf, sql_content="some_sql", execute_result="" - ) - self.query_apply_1 = QueryPrivilegesApply.objects.create( - group_id=1, - group_name="some_name", - title="some_title1", - user_name="some_user", - instance=self.ins, - db_list="some_db,some_db2", - limit_num=100, - valid_date=tomorrow, - priv_type=1, - status=0, - audit_auth_groups="some_audit_group", - ) - self.archive_apply_1 = ArchiveConfig.objects.create( - title="title", - resource_group=self.res_group, - audit_auth_groups="some_audit_group", - src_instance=self.ins, - src_db_name="src_db_name", - src_table_name="src_table_name", - dest_instance=self.ins, - dest_db_name="src_db_name", - dest_table_name="src_table_name", - condition="1=1", - mode="file", - no_delete=True, - sleep=1, - status=WorkflowStatus.WAITING, - state=False, - user_name="some_user", - user_display="display", - ) - self.audit = WorkflowAudit.objects.create( - group_id=1, - group_name="some_group", - workflow_id=1, - workflow_type=1, - workflow_title="申请标题", - workflow_remark="申请备注", - audit_auth_groups="1,2,3", - current_audit="1", - next_audit="2", - current_status=0, - ) - self.wl = WorkflowLog.objects.create( - audit_id=self.audit.audit_id, operation_type=1 - ) - - def tearDown(self): - self.sys_config.purge() - User.objects.all().delete() - SqlWorkflow.objects.all().delete() - SqlWorkflowContent.objects.all().delete() - WorkflowAudit.objects.all().delete() - WorkflowAuditDetail.objects.all().delete() - WorkflowAuditSetting.objects.all().delete() - QueryPrivilegesApply.objects.all().delete() - WorkflowLog.objects.all().delete() - ResourceGroup.objects.all().delete() - ArchiveConfig.objects.all().delete() - - def test_audit_add_query(self): - """测试添加查询审核工单""" - result, _ = Audit.add(1, self.query_apply_1.apply_id) - audit_id = result["data"]["audit_id"] - workflow_status = result["data"]["workflow_status"] - self.assertEqual(workflow_status, WorkflowStatus.WAITING) - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - # 当前审批 - self.assertEqual(audit_detail.current_audit, "some_audit_group") - # 无下级审批 - self.assertEqual(audit_detail.next_audit, "-1") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).first() - self.assertEqual(log_info.operation_type, 0) - self.assertEqual(log_info.operation_type_desc, "提交") - self.assertIn("等待审批,审批流程:", log_info.operation_info) - - def test_audit_add_sqlreview(self): - """测试添加上线审核工单""" - result, _ = Audit.add(2, self.wf.id) - audit_id = result["data"]["audit_id"] - workflow_status = result["data"]["workflow_status"] - self.assertEqual(workflow_status, WorkflowStatus.WAITING) - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - # 当前审批 - self.assertEqual(audit_detail.current_audit, "some_audit_group") - # 无下级审批 - self.assertEqual(audit_detail.next_audit, "-1") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).first() - self.assertEqual(log_info.operation_type, 0) - self.assertEqual(log_info.operation_type_desc, "提交") - self.assertIn("等待审批,审批流程:", log_info.operation_info) - - def test_audit_add_archive_review(self): - """测试添加数据归档工单""" - result, workflow_audit_detail = Audit.add(3, self.archive_apply_1.id) - audit_id = result["data"]["audit_id"] - workflow_status = result["data"]["workflow_status"] - self.assertEqual(workflow_status, WorkflowStatus.WAITING) - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - # 当前审批 - self.assertEqual(audit_detail.current_audit, "some_audit_group") - # 无下级审批 - self.assertEqual(audit_detail.next_audit, "-1") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).first() - self.assertEqual(log_info.operation_type, 0) - self.assertEqual(log_info.operation_type_desc, "提交") - self.assertIn("等待审批,审批流程:", log_info.operation_info) - - def test_audit_add_wrong_type(self): - """测试添加不存在的类型""" - with self.assertRaisesMessage(Exception, "工单类型不存在"): - Audit.add(4, 1) - - def test_audit_add_settings_not_exists(self): - """测试审批流程未配置""" - self.wf.audit_auth_groups = "" - self.wf.save() - with self.assertRaisesMessage(Exception, "审批流程不能为空,请先配置审批流程"): - Audit.add(2, self.wf.id) - - def test_audit_add_duplicate(self): - """测试重复提交""" - Audit.add(2, self.wf.id) - with self.assertRaisesMessage(Exception, "该工单当前状态为待审核,请勿重复提交"): - Audit.add(2, self.wf.id) - - @patch("sql.utils.workflow_audit.is_auto_review", return_value=True) - def test_audit_add_auto_review(self, _is_auto_review): - """测试提交自动审核通过""" - self.sys_config.set("auto_review", "true") - result, workflow_audit_detail = Audit.add(2, self.wf.id) - audit_id = result["data"]["audit_id"] - workflow_status = result["data"]["workflow_status"] - self.assertEqual(workflow_status, WorkflowStatus.PASSED) - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - # 无下级审批 - self.assertEqual(audit_detail.next_audit, "-1") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).first() - self.assertEqual(log_info.operation_type, 0) - self.assertEqual(log_info.operation_type_desc, "提交") - self.assertEqual(log_info.operation_info, "无需审批,系统直接审核通过") - - def test_audit_add_multiple_audit(self): - """测试提交多级审核""" - self.wf.audit_auth_groups = "1,2,3" - self.wf.save() - result, _ = Audit.add(2, self.wf.id) - audit_id = result["data"]["audit_id"] - workflow_status = result["data"]["workflow_status"] - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - self.assertEqual(workflow_status, WorkflowStatus.WAITING) - # 存在下级审批 - self.assertEqual(audit_detail.current_audit, "1") - self.assertEqual(audit_detail.next_audit, "2") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).first() - self.assertEqual(log_info.operation_type, 0) - self.assertEqual(log_info.operation_type_desc, "提交") - self.assertIn("等待审批,审批流程:", log_info.operation_info) - - def test_audit_success_not_exists_next(self): - """测试审核通过、无下一级""" - self.audit.current_audit = "3" - self.audit.next_audit = "-1" - self.audit.save() - result, _ = Audit.audit( - self.audit.audit_id, - WorkflowStatus.PASSED, - self.user.username, - "通过", - ) - audit_id = self.audit.audit_id - workflow_status = result["data"]["workflow_status"] - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - self.assertEqual(workflow_status, WorkflowStatus.PASSED) - # 不存在下级审批 - self.assertEqual(audit_detail.next_audit, "-1") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).order_by("-id").first() - self.assertEqual(log_info.operator, self.user.username) - self.assertEqual(log_info.operator_display, self.user.display) - self.assertEqual(log_info.operation_type, 1) - self.assertEqual(log_info.operation_type_desc, "审批通过") - self.assertEqual(log_info.operation_info, f"审批备注:通过,下级审批:None") - - def test_audit_success_exists_next(self): - """测试审核通过、存在下一级""" - self.audit.current_audit = "1" - self.audit.next_audit = "2" - self.audit.save() - result, _ = Audit.audit( - self.audit.audit_id, - WorkflowStatus.PASSED, - self.user.username, - "通过", - ) - audit_id = self.audit.audit_id - workflow_status = result["data"]["workflow_status"] - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - self.assertEqual(workflow_status, WorkflowStatus.WAITING) - # 存在下级审批 - self.assertEqual(audit_detail.next_audit, "3") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).order_by("-id").first() - self.assertEqual(log_info.operator, self.user.username) - self.assertEqual(log_info.operator_display, self.user.display) - self.assertEqual(log_info.operation_type, 1) - self.assertEqual(log_info.operation_type_desc, "审批通过") - self.assertEqual(log_info.operation_info, f"审批备注:通过,下级审批:2") - - def test_audit_reject(self): - """测试审核不通过""" - result, _ = Audit.audit( - self.audit.audit_id, - WorkflowStatus.REJECTED, - self.user.username, - "不通过", - ) - audit_id = self.audit.audit_id - workflow_status = result["data"]["workflow_status"] - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - self.assertEqual(workflow_status, WorkflowStatus.REJECTED) - # 不存在下级审批 - self.assertEqual(audit_detail.next_audit, "-1") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).order_by("-id").first() - self.assertEqual(log_info.operator, self.user.username) - self.assertEqual(log_info.operator_display, self.user.display) - self.assertEqual(log_info.operation_type, 2) - self.assertEqual(log_info.operation_type_desc, "审批不通过") - self.assertEqual(log_info.operation_info, f"审批备注:不通过") - - def test_audit_abort(self): - """测试取消审批""" - self.audit.create_user = self.user.username - self.audit.save() - result, _ = Audit.audit( - self.audit.audit_id, - WorkflowStatus.ABORTED, - self.user.username, - "取消", - ) - audit_id = self.audit.audit_id - workflow_status = result["data"]["workflow_status"] - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - self.assertEqual(workflow_status, WorkflowStatus.ABORTED) - # 不存在下级审批 - self.assertEqual(audit_detail.next_audit, "-1") - # 验证日志 - log_info = WorkflowLog.objects.filter(audit_id=audit_id).order_by("-id").first() - self.assertEqual(log_info.operator, self.user.username) - self.assertEqual(log_info.operator_display, self.user.display) - self.assertEqual(log_info.operation_type, 3) - self.assertEqual(log_info.operation_type_desc, "审批取消") - self.assertEqual(log_info.operation_info, f"取消原因:取消") - - def test_audit_wrong_exception(self): - """测试审核异常的状态""" - with self.assertRaisesMessage(Exception, "审核异常"): - Audit.audit(self.audit.audit_id, 10, self.user.username, "") - - def test_audit_success_wrong_status(self): - """测试审核通过,当前状态不是待审核""" - self.audit.current_status = 1 - self.audit.save() - with self.assertRaisesMessage(Exception, "工单不是待审核状态,请返回刷新"): - Audit.audit( - self.audit.audit_id, - WorkflowStatus.PASSED, - self.user.username, - "", - ) - - def test_audit_reject_wrong_status(self): - """测试审核不通过,当前状态不是待审核""" - self.audit.current_status = 1 - self.audit.save() - with self.assertRaisesMessage(Exception, "工单不是待审核状态,请返回刷新"): - Audit.audit( - self.audit.audit_id, - WorkflowStatus.REJECTED, - self.user.username, - "", - ) - - def test_audit_abort_wrong_status(self): - """测试审核不通过,当前状态不是待审核""" - self.audit.current_status = 2 - self.audit.save() - with self.assertRaisesMessage(Exception, "工单不是待审核态/审核通过状态,请返回刷新"): - Audit.audit( - self.audit.audit_id, - WorkflowStatus.ABORTED, - self.user.username, - "", - ) - - @patch("sql.utils.workflow_audit.user_groups", return_value=[]) - def test_todo(self, _user_groups): - """TODO 测试todo数量,未断言""" - Audit.todo(self.user) - Audit.todo(self.su) - - def test_detail(self): - """测试获取审核信息""" - result = Audit.detail(self.audit.audit_id) - self.assertEqual(result, self.audit) - result = Audit.detail(0) - self.assertEqual(result, None) - - def test_detail_by_workflow_id(self): - """测试通过业务id获取审核信息""" - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.save() - result = Audit.detail_by_workflow_id(self.wf.id, WorkflowType.SQL_REVIEW) - self.assertEqual(result, self.audit) - result = Audit.detail_by_workflow_id(0, 0) - self.assertEqual(result, None) - - def test_settings(self): - """测试通过组和审核类型,获取审核配置信息""" - WorkflowAuditSetting.objects.create( - workflow_type=1, group_id=1, audit_auth_groups="1,2,3" - ) - result = Audit.settings(workflow_type=1, group_id=1) - self.assertEqual(result, "1,2,3") - result = Audit.settings(0, 0) - self.assertEqual(result, None) - - def test_change_settings_edit(self): - """修改配置信息""" - ws = WorkflowAuditSetting.objects.create( - workflow_type=1, group_id=1, audit_auth_groups="1,2,3" - ) - Audit.change_settings(workflow_type=1, group_id=1, audit_auth_groups="1,2") - ws = WorkflowAuditSetting.objects.get(audit_setting_id=ws.audit_setting_id) - self.assertEqual(ws.audit_auth_groups, "1,2") - - def test_change_settings_add(self): - """添加配置信息""" - Audit.change_settings(workflow_type=1, group_id=1, audit_auth_groups="1,2") - ws = WorkflowAuditSetting.objects.get(workflow_type=1, group_id=1) - self.assertEqual(ws.audit_auth_groups, "1,2") - - @patch("sql.utils.workflow_audit.auth_group_users") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - def test_can_review_sql_review(self, _detail_by_workflow_id, _auth_group_users): - """测试判断用户当前是否是可审核上线工单,非管理员拥有权限""" - sql_review = Permission.objects.get(codename="sql_review") - self.user.user_permissions.add(sql_review) - aug = Group.objects.create(name="auth_group") - _detail_by_workflow_id.return_value.current_audit = aug.id - _auth_group_users.return_value.filter.exists = True - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.save() - r = Audit.can_review( - self.user, self.audit.workflow_id, self.audit.workflow_type - ) - self.assertEqual(r, True) - - @patch("sql.utils.workflow_audit.auth_group_users") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - def test_cannot_review_self_sql_review( - self, _detail_by_workflow_id, _auth_group_users - ): - """测试确认用户不能审核自己提交的上线工单,非管理员拥有权限""" - self.sys_config.set("ban_self_audit", "true") - sql_review = Permission.objects.get(codename="sql_review") - self.user.user_permissions.add(sql_review) - aug = Group.objects.create(name="auth_group") - _detail_by_workflow_id.return_value.current_audit = aug.id - _auth_group_users.return_value.filter.exists = True - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.own_wf.id - self.audit.save() - r = Audit.can_review( - self.user, self.audit.workflow_id, self.audit.workflow_type - ) - self.assertEqual(r, False) - - @patch("sql.utils.workflow_audit.auth_group_users") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - def test_can_review_query_review(self, _detail_by_workflow_id, _auth_group_users): - """测试判断用户当前是否是可审核查询工单,非管理员拥有权限""" - query_review = Permission.objects.get(codename="query_review") - self.user.user_permissions.add(query_review) - aug = Group.objects.create(name="auth_group") - _detail_by_workflow_id.return_value.current_audit = aug.id - _auth_group_users.return_value.filter.exists = True - self.audit.workflow_type = WorkflowType.QUERY - self.audit.workflow_id = self.query_apply_1.apply_id - self.audit.save() - r = Audit.can_review( - self.user, self.audit.workflow_id, self.audit.workflow_type - ) - self.assertEqual(r, True) - - @patch("sql.utils.workflow_audit.auth_group_users") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - def test_can_review_sql_review_super( - self, _detail_by_workflow_id, _auth_group_users - ): - """测试判断用户当前是否是可审核查询工单,用户是管理员""" - aug = Group.objects.create(name="auth_group") - _detail_by_workflow_id.return_value.current_audit = aug.id - _auth_group_users.return_value.filter.exists = True - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.save() - r = Audit.can_review(self.su, self.audit.workflow_id, self.audit.workflow_type) - self.assertEqual(r, True) - - @patch("sql.utils.workflow_audit.auth_group_users") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - def test_can_review_wrong_status(self, _detail_by_workflow_id, _auth_group_users): - """测试判断用户当前是否是可审核,非待审核工单""" - aug = Group.objects.create(name="auth_group") - _detail_by_workflow_id.return_value.current_audit = aug.id - _auth_group_users.return_value.filter.exists = True - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.current_status = WorkflowStatus.PASSED - self.audit.save() - r = Audit.can_review( - self.user, self.audit.workflow_id, self.audit.workflow_type - ) - self.assertEqual(r, False) - - @patch("sql.utils.workflow_audit.auth_group_users") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - def test_can_review_no_prem(self, _detail_by_workflow_id, _auth_group_users): - """测试判断用户当前是否是可审核,普通用户无权限""" - aug = Group.objects.create(name="auth_group") - _detail_by_workflow_id.return_value.current_audit = aug.id - _auth_group_users.return_value.filter.exists = True - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.save() - r = Audit.can_review( - self.user, self.audit.workflow_id, self.audit.workflow_type - ) - self.assertEqual(r, False) - - @patch("sql.utils.workflow_audit.auth_group_users") - @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") - def test_can_review_no_prem_exception( - self, _detail_by_workflow_id, _auth_group_users - ): - """测试判断用户当前是否是可审核,权限组不存在""" - Group.objects.create(name="auth_group") - _detail_by_workflow_id.side_effect = RuntimeError() - _auth_group_users.return_value.filter.exists = True - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.save() - with self.assertRaisesMessage(Exception, "当前审批auth_group_id不存在,请检查并清洗历史数据"): - Audit.can_review( - self.user, self.audit.workflow_id, self.audit.workflow_type - ) - - def test_review_info_no_review(self): - """测试获取当前工单审批流程和当前审核组,无需审批""" - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.audit_auth_groups = "" - self.audit.current_audit = "-1" - self.audit.save() - audit_auth_group, current_audit_auth_group = Audit.review_info( - self.audit.workflow_id, self.audit.workflow_type - ) - self.assertEqual(audit_auth_group, "无需审批") - self.assertEqual(current_audit_auth_group, None) - - def test_review_info(self): - """测试获取当前工单审批流程和当前审核组,无需审批""" - aug = Group.objects.create(name="DBA") - self.audit.workflow_type = WorkflowType.SQL_REVIEW - self.audit.workflow_id = self.wf.id - self.audit.audit_auth_groups = str(aug.id) - self.audit.current_audit = str(aug.id) - self.audit.save() - audit_auth_group, current_audit_auth_group = Audit.review_info( - self.audit.workflow_id, self.audit.workflow_type - ) - self.assertEqual(audit_auth_group, "DBA") - self.assertEqual(current_audit_auth_group, "DBA") - - def test_logs(self): - """测试获取工单日志""" - r = Audit.logs(self.audit.audit_id).first() - self.assertEqual(r, self.wl) - - class TestDataMasking(TestCase): def setUp(self): self.superuser = User.objects.create(username="super", is_superuser=True) diff --git a/sql/utils/workflow_audit.py b/sql/utils/workflow_audit.py index 266cacbfa6..4f6578b102 100644 --- a/sql/utils/workflow_audit.py +++ b/sql/utils/workflow_audit.py @@ -1,10 +1,18 @@ # -*- coding: UTF-8 -*- +import dataclasses +import importlib +from dataclasses import dataclass, field +from typing import Union, Optional, List +import logging + from django.contrib.auth.models import Group from django.utils import timezone +from django.core.exceptions import ObjectDoesNotExist +from django.conf import settings from sql.utils.resource_group import user_groups, auth_group_users from sql.utils.sql_review import is_auto_review -from common.utils.const import WorkflowStatus, WorkflowType +from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction from sql.models import ( WorkflowAudit, WorkflowAuditDetail, @@ -19,278 +27,427 @@ from common.config import SysConfig -class Audit(object): - # 新增工单审核 - @staticmethod - def add(workflow_type, workflow_id): - result = {"status": 0, "msg": "", "data": []} +logger = logging.getLogger("default") - # 检查是否已存在待审核数据 - workflow_info = WorkflowAudit.objects.filter( - workflow_type=workflow_type, - workflow_id=workflow_id, - current_status=WorkflowStatus.WAITING, - ) - if len(workflow_info) >= 1: - result["msg"] = "该工单当前状态为待审核,请勿重复提交" - raise Exception(result["msg"]) - - # 获取工单信息 - if workflow_type == WorkflowType.QUERY: - workflow_detail = QueryPrivilegesApply.objects.get(apply_id=workflow_id) - workflow_title = workflow_detail.title - group_id = workflow_detail.group_id - group_name = workflow_detail.group_name - create_user = workflow_detail.user_name - create_user_display = workflow_detail.user_display - audit_auth_groups = workflow_detail.audit_auth_groups - workflow_remark = "" - elif workflow_type == WorkflowType.SQL_REVIEW: - workflow_detail = SqlWorkflow.objects.get(pk=workflow_id) - workflow_title = workflow_detail.workflow_name - group_id = workflow_detail.group_id - group_name = workflow_detail.group_name - create_user = workflow_detail.engineer - create_user_display = workflow_detail.engineer_display - audit_auth_groups = workflow_detail.audit_auth_groups - workflow_remark = "" - elif workflow_type == WorkflowType.ARCHIVE: - workflow_detail = ArchiveConfig.objects.get(pk=workflow_id) - workflow_title = workflow_detail.title - group_id = workflow_detail.resource_group.group_id - group_name = workflow_detail.resource_group.group_name - create_user = workflow_detail.user_name - create_user_display = workflow_detail.user_display - audit_auth_groups = workflow_detail.audit_auth_groups - workflow_remark = "" - else: - result["msg"] = "工单类型不存在" - raise Exception(result["msg"]) - # 校验是否配置审批流程 - if audit_auth_groups == "": - result["msg"] = "审批流程不能为空,请先配置审批流程" - raise Exception(result["msg"]) +class AuditException(Exception): + pass + + +@dataclass +class AuditSetting: + """ + audit_auth_groups 为 django 组的 id + """ + + audit_auth_groups: List = field(default_factory=list) + auto_pass: bool = False + + @property + def audit_auth_group_in_db(self): + return ",".join(str(x) for x in self.audit_auth_groups) + + +# 列出审核工单中不同状态的合法操作 +SUPPORTED_OPERATION_GRID = { + WorkflowStatus.WAITING.value: [ + WorkflowAction.PASS, + WorkflowAction.REJECT, + WorkflowAction.ABORT, + ], + WorkflowStatus.PASSED.value: [ + WorkflowAction.REJECT, + WorkflowAction.ABORT, + WorkflowAction.EXECUTE_SET_TIME, + WorkflowAction.EXECUTE_START, + WorkflowAction.EXECUTE_END, + ], + WorkflowStatus.REJECTED.value: [], + WorkflowStatus.ABORTED.value: [], +} + + +@dataclass +class AuditV2: + # workflow 对象有可能是还没有在数据库中创建的对象, 这里需要注意 + workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply] = None + sys_config: SysConfig = field(default_factory=SysConfig) + audit: WorkflowAudit = None + workflow_type: WorkflowType = WorkflowType.SQL_REVIEW + workflow_pk_field: str = "id" + # 归档表中没有下面两个参数, 所以对归档表来说一下两参数必传 + resource_group: str = "" + resource_group_id: int = 0 + + def __post_init__(self): + if not self.workflow: + if not self.audit: + raise ValueError("需要提供 WorkflowAudit 或 workflow") + self.get_workflow() + if isinstance(self.workflow, SqlWorkflow): + self.workflow_type = WorkflowType.SQL_REVIEW + self.workflow_pk_field = "id" + self.resource_group = self.workflow.group_name + self.resource_group_id = self.workflow.group_id + elif isinstance(self.workflow, ArchiveConfig): + self.workflow_type = WorkflowType.ARCHIVE + self.workflow_pk_field = "id" + try: + group_in_db = ResourceGroup.objects.get(group_name=self.resource_group) + self.resource_group_id = group_in_db.group_id + except ResourceGroup.DoesNotExist: + raise AuditException(f"参数错误, 未发现资源组 {self.resource_group}") + elif isinstance(self.workflow, QueryPrivilegesApply): + self.workflow_type = WorkflowType.QUERY + self.workflow_pk_field = "apply_id" + self.resource_group = self.workflow.group_name + self.resource_group_id = self.workflow.group_id + # 该方法可能获取不到相关的审批流, 但是也不要报错, 因为有的时候是新建工单, 此时还没有审批流 + self.get_audit_info() + # 防止 get_auditor 显式的传了个 None + if not self.sys_config: + self.sys_config = SysConfig() + + @property + def review_info(self) -> (str, str): + """获取可读的审批流信息, 包含整体的审批流和当前节点信息""" + if self.audit.audit_auth_groups == "": + audit_auth_group = "无需审批" else: - audit_auth_groups_list = audit_auth_groups.split(",") - - # 判断是否无需审核,并且修改审批人为空 - if SysConfig().get("auto_review", False): - if workflow_type == WorkflowType.SQL_REVIEW: - if is_auto_review(workflow_id): - sql_workflow = SqlWorkflow.objects.get(id=int(workflow_id)) - sql_workflow.audit_auth_groups = "无需审批" - sql_workflow.status = "workflow_review_pass" - sql_workflow.save() - audit_auth_groups_list = None - - # 无审核配置则无需审核,直接通过 - if audit_auth_groups_list is None: - # 向审核主表插入审核通过的数据 - audit_detail = WorkflowAudit() - audit_detail.group_id = group_id - audit_detail.group_name = group_name - audit_detail.workflow_id = workflow_id - audit_detail.workflow_type = workflow_type - audit_detail.workflow_title = workflow_title - audit_detail.workflow_remark = workflow_remark - audit_detail.audit_auth_groups = "" - audit_detail.current_audit = "-1" - audit_detail.next_audit = "-1" - audit_detail.current_status = WorkflowStatus.PASSED # 审核通过 - audit_detail.create_user = create_user - audit_detail.create_user_display = create_user_display - audit_detail.save() - result["data"] = {"workflow_status": WorkflowStatus.PASSED} - result["msg"] = "无审核配置,直接审核通过" - # 增加工单日志 - Audit.add_log( - audit_id=audit_detail.audit_id, - operation_type=0, - operation_type_desc="提交", - operation_info="无需审批,系统直接审核通过", - operator=audit_detail.create_user, - operator_display=audit_detail.create_user_display, - ) + try: + audit_auth_group = "->".join( + [ + Group.objects.get(id=auth_group_id).name + for auth_group_id in self.audit.audit_auth_groups.split(",") + ] + ) + except Group.DoesNotExist: + audit_auth_group = self.audit.audit_auth_groups + if self.audit.current_audit == "-1": + current_audit_auth_group = None else: - # 向审核主表插入待审核数据 - audit_detail = WorkflowAudit() - audit_detail.group_id = group_id - audit_detail.group_name = group_name - audit_detail.workflow_id = workflow_id - audit_detail.workflow_type = workflow_type - audit_detail.workflow_title = workflow_title - audit_detail.workflow_remark = workflow_remark - audit_detail.audit_auth_groups = ",".join(audit_auth_groups_list) - audit_detail.current_audit = audit_auth_groups_list[0] - # 判断有无下级审核 - if len(audit_auth_groups_list) == 1: - audit_detail.next_audit = "-1" - else: - audit_detail.next_audit = audit_auth_groups_list[1] - - audit_detail.current_status = WorkflowStatus.WAITING - audit_detail.create_user = create_user - audit_detail.create_user_display = create_user_display - audit_detail.save() - result["data"] = {"workflow_status": WorkflowStatus.WAITING} - # 增加工单日志 - audit_auth_group, current_audit_auth_group = Audit.review_info( - workflow_id, workflow_type + try: + current_audit_auth_group = Group.objects.get( + id=self.audit.current_audit + ).name + except Group.DoesNotExist: + current_audit_auth_group = self.audit.current_audit + return audit_auth_group, current_audit_auth_group + + def get_workflow(self): + """尝试从 audit 中取出 workflow""" + if self.audit.workflow_type == WorkflowType.QUERY: + self.workflow = QueryPrivilegesApply.objects.get( + apply_id=self.audit.workflow_id ) - Audit.add_log( - audit_id=audit_detail.audit_id, - operation_type=0, - operation_type_desc="提交", - operation_info="等待审批,审批流程:{}".format(audit_auth_group), - operator=audit_detail.create_user, - operator_display=audit_detail.create_user_display, + elif self.audit.workflow_type == WorkflowType.SQL_REVIEW: + self.workflow = SqlWorkflow.objects.get(id=self.audit.workflow_id) + elif self.audit.workflow_type == WorkflowType.ARCHIVE: + self.workflow = ArchiveConfig.objects.get(id=self.audit.workflow_id) + self.resource_group = self.audit.group_name + self.resource_group_id = self.audit.group_id + + def generate_audit_setting(self) -> AuditSetting: + if self.workflow_type == WorkflowType.SQL_REVIEW: + if self.sys_config.get("auto_review", False): + # 判断是否无需审批 + if is_auto_review(self.workflow.id): + return AuditSetting(auto_pass=True, audit_auth_groups=["无需审批"]) + if self.workflow_type in [WorkflowType.SQL_REVIEW, WorkflowType.QUERY]: + group_id = self.workflow.group_id + + else: + # ArchiveConfig + group_id = self.resource_group_id + try: + workflow_audit_setting = WorkflowAuditSetting.objects.get( + workflow_type=self.workflow_type, group_id=group_id ) - # 增加审核id - result["data"]["audit_id"] = audit_detail.audit_id - # 返回添加结果 - return result, audit_detail + except WorkflowAuditSetting.DoesNotExist: + raise AuditException(f"审批类型 {self.workflow_type.label} 未配置审流") + return AuditSetting( + audit_auth_groups=workflow_audit_setting.audit_auth_groups.split(",") + ) - # 工单审核 - @staticmethod - def audit( - audit_id, audit_status, audit_user, audit_remark - ) -> (dict, WorkflowAuditDetail): - result = {"status": 0, "msg": "ok", "data": 0} - audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) - - # 不同审核状态 - if audit_status == WorkflowStatus.PASSED: - # 判断当前工单是否为待审核状态 - if audit_detail.current_status != WorkflowStatus.WAITING: - result["msg"] = "工单不是待审核状态,请返回刷新" - raise Exception(result["msg"]) - - # 判断是否还有下一级审核 - if audit_detail.next_audit == "-1": - # 更新主表审核状态为审核通过 - audit_result = WorkflowAudit() - audit_result.audit_id = audit_id - audit_result.current_audit = "-1" - audit_result.current_status = WorkflowStatus.PASSED - audit_result.save(update_fields=["current_audit", "current_status"]) - else: - # 更新主表审核下级审核组和当前审核组 - audit_result = WorkflowAudit() - audit_result.audit_id = audit_id - audit_result.current_status = WorkflowStatus.WAITING - audit_result.current_audit = audit_detail.next_audit - # 判断后续是否还有下下一级审核组 - audit_auth_groups_list = audit_detail.audit_auth_groups.split(",") - for index, auth_group in enumerate(audit_auth_groups_list): - if auth_group == audit_detail.next_audit: - # 无下下级审核组 - if index == len(audit_auth_groups_list) - 1: - audit_result.next_audit = "-1" - break - # 存在下下级审核组 - else: - audit_result.next_audit = audit_auth_groups_list[index + 1] - audit_result.save( - update_fields=["current_audit", "next_audit", "current_status"] - ) + def create_audit(self) -> str: + """按照传进来的工作流创建审批流, 返回一个 message如果有任何错误, 会以 exception 的形式抛出, 其他情况都是正常进行""" + # 检查是否已存在待审核数据 + workflow_info = self.get_audit_info() + if workflow_info: + raise AuditException("该工单当前状态为待审核,请勿重复提交") + # 获取审批流程 + audit_setting = self.generate_audit_setting() - # 插入审核明细数据 - audit_detail_result = WorkflowAuditDetail() - audit_detail_result.audit_id = audit_id - audit_detail_result.audit_user = audit_user - audit_detail_result.audit_status = WorkflowStatus.PASSED - audit_detail_result.audit_time = timezone.now() - audit_detail_result.remark = audit_remark - audit_detail_result.save() - # 增加工单日志 - audit_auth_group, current_audit_auth_group = Audit.review_info( - audit_detail.workflow_id, audit_detail.workflow_type + if self.workflow_type == WorkflowType.QUERY: + workflow_title = self.workflow.title + group_id = self.workflow.group_id + group_name = self.workflow.group_name + create_user = self.workflow.user_name + create_user_display = self.workflow.user_display + self.workflow.audit_auth_groups = audit_setting.audit_auth_group_in_db + elif self.workflow_type == WorkflowType.SQL_REVIEW: + workflow_title = self.workflow.workflow_name + group_id = self.workflow.group_id + group_name = self.workflow.group_name + create_user = self.workflow.engineer + create_user_display = self.workflow.engineer_display + self.workflow.audit_auth_groups = audit_setting.audit_auth_group_in_db + elif self.workflow_type == WorkflowType.ARCHIVE: + workflow_title = self.workflow.title + group_id = self.resource_group_id + group_name = self.resource_group + create_user = self.workflow.user_name + create_user_display = self.workflow.user_display + self.workflow.audit_auth_groups = audit_setting.audit_auth_group_in_db + else: + raise AuditException(f"不支持的审核类型: {self.workflow_type.label}") + self.workflow.save() + self.audit = WorkflowAudit( + group_id=group_id, + group_name=group_name, + workflow_id=self.workflow.__getattribute__(self.workflow_pk_field), + workflow_type=self.workflow_type, + workflow_title=workflow_title, + audit_auth_groups=audit_setting.audit_auth_group_in_db, + current_audit="-1", + next_audit="-1", + create_user=create_user, + create_user_display=create_user_display, + ) + # 自动通过的情况 + if audit_setting.auto_pass: + self.audit.current_status = WorkflowStatus.PASSED + self.audit.save() + WorkflowLog.objects.create( + audit_id=self.audit.audit_id, + operation_type=WorkflowAction.SUBMIT, + operation_type_desc=WorkflowAction.SUBMIT.label, + operation_info="无需审批,系统直接审核通过", + operator=self.audit.create_user, + operator_display=self.audit.create_user_display, ) - Audit.add_log( - audit_id=audit_id, - operation_type=1, - operation_type_desc="审批通过", - operation_info="审批备注:{},下级审批:{}".format( - audit_remark, current_audit_auth_group - ), - operator=audit_user, - operator_display=Users.objects.get(username=audit_user).display, + + return "无需审批, 直接审核通过" + + # 向审核主表插入待审核数据 + self.audit.current_audit = audit_setting.audit_auth_groups[0] + # 判断有无下级审核 + if len(audit_setting.audit_auth_groups) == 1: + self.audit.next_audit = "-1" + else: + self.audit.next_audit = audit_setting.audit_auth_groups[1] + + self.audit.current_status = WorkflowStatus.WAITING + self.audit.create_user = create_user + self.audit.create_user_display = create_user_display + self.audit.save() + readable_review_flow, _ = self.review_info + audit_log = WorkflowLog( + audit_id=self.audit.audit_id, + operation_type=WorkflowAction.SUBMIT, + operation_type_desc=WorkflowAction.SUBMIT.label, + operation_info="等待审批,审批流程:{}".format(readable_review_flow), + operator=self.audit.create_user, + operator_display=self.audit.create_user_display, + ) + audit_log.save() + return "工单已正常提交" + + def can_operate(self, action: WorkflowAction, actor: Users): + """检查用户是否有权限做相关操作, 默认不返回, 如有权限问题, raise AuditException""" + # 首先检查工单状态和相关操作是否匹配, 如已通过的工单不能再通过 + allowed_actions = SUPPORTED_OPERATION_GRID.get(self.audit.current_status) + if not allowed_actions: + raise AuditException( + f"不允许的操作, 工单当前状态为 {self.audit.current_status}, 不允许做任何操作" ) - elif audit_status == WorkflowStatus.REJECTED: - # 判断当前工单是否为待审核状态 - if audit_detail.current_status != WorkflowStatus.WAITING: - result["msg"] = "工单不是待审核状态,请返回刷新" - raise Exception(result["msg"]) - - # 更新主表审核状态 - audit_result = WorkflowAudit() - audit_result.audit_id = audit_id - audit_result.current_audit = "-1" - audit_result.next_audit = "-1" - audit_result.current_status = WorkflowStatus.REJECTED - audit_result.save( - update_fields=["current_audit", "next_audit", "current_status"] + if action not in allowed_actions: + raise AuditException( + f"不允许的操作, 工单当前状态为 {self.audit.current_status}, 允许的操作为{','.join(x.label for x in allowed_actions)}" ) + if self.workflow_type == WorkflowType.QUERY: + need_user_permission = "sql.query_review" + elif self.workflow_type == WorkflowType.SQL_REVIEW: + need_user_permission = "sql.sql_review" + elif self.workflow_type == WorkflowType.ARCHIVE: + need_user_permission = "sql.archive_review" + else: + raise AuditException(f"不支持的工单类型: {self.workflow_type}") - # 插入审核明细数据 - audit_detail_result = WorkflowAuditDetail() - audit_detail_result.audit_id = audit_id - audit_detail_result.audit_user = audit_user - audit_detail_result.audit_status = WorkflowStatus.REJECTED - audit_detail_result.audit_time = timezone.now() - audit_detail_result.remark = audit_remark - audit_detail_result.save() - # 增加工单日志 - Audit.add_log( - audit_id=audit_id, - operation_type=2, - operation_type_desc="审批不通过", - operation_info="审批备注:{}".format(audit_remark), - operator=audit_user, - operator_display=Users.objects.get(username=audit_user).display, - ) - elif audit_status == WorkflowStatus.ABORTED: - # 判断当前工单是否为待审核/审核通过状态 - if ( - audit_detail.current_status != WorkflowStatus.WAITING - and audit_detail.current_status != WorkflowStatus.PASSED + if action == WorkflowAction.ABORT: + if actor.username != self.audit.create_user: + raise AuditException(f"只有工单提交者可以撤回工单") + return + if action in [WorkflowAction.PASS, WorkflowAction.REJECT]: + # 需要检查权限 + # 超级用户可以审批所有工单 + if actor.is_superuser: + return + # 看是否本人审核 + if actor.username == self.audit.create_user and self.sys_config.get( + "ban_self_audit" ): - result["msg"] = "工单不是待审核态/审核通过状态,请返回刷新" - raise Exception(result["msg"]) - - # 更新主表审核状态 - audit_result = WorkflowAudit() - audit_result.audit_id = audit_id - audit_result.next_audit = "-1" - audit_result.current_status = WorkflowStatus.ABORTED - audit_result.save(update_fields=["current_status", "next_audit"]) - - # 插入审核明细数据 - audit_detail_result = WorkflowAuditDetail() - audit_detail_result.audit_id = audit_id - audit_detail_result.audit_user = audit_user - audit_detail_result.audit_status = WorkflowStatus.ABORTED - audit_detail_result.audit_time = timezone.now() - audit_detail_result.remark = audit_remark - audit_detail_result.save() - - # 增加工单日志 - Audit.add_log( - audit_id=audit_id, - operation_type=3, - operation_type_desc="审批取消", - operation_info="取消原因:{}".format(audit_remark), - operator=audit_user, - operator_display=Users.objects.get(username=audit_user).display, + raise AuditException("当前配置禁止本人审核自己的工单") + # 确认用户权限 + if not actor.has_perm(need_user_permission): + raise AuditException("用户无相关审批权限, 请合理配置权限") + + # 确认权限, 是否在当前审核组内 + try: + audit_auth_group = Group.objects.get(id=self.audit.current_audit) + except Group.DoesNotExist: + raise AuditException("当前审批权限组不存在, 请联系管理员检查并清洗错误数据") + if not auth_group_users([audit_auth_group.name], self.resource_group_id): + raise AuditException("用户不在当前审批审批节点的用户组内, 无权限审核") + return + if action in [ + WorkflowAction.EXECUTE_START, + WorkflowAction.EXECUTE_END, + WorkflowAction.EXECUTE_SET_TIME, + ]: + # 一般是系统自动流转, 自动通过 + return + + raise AuditException(f"不支持的操作, 无法判断权限") + + def operate( + self, action: WorkflowAction, actor: Users, remark: str + ) -> WorkflowAuditDetail: + """操作已提交的工单""" + if not self.audit: + raise AuditException(f"给定工单未绑定审批信息, 无法进行操作") + self.can_operate(action, actor) + + if action == WorkflowAction.PASS: + return self.operate_pass(actor, remark) + if action == WorkflowAction.REJECT: + return self.operate_reject(actor, remark) + if action == WorkflowAction.ABORT: + return self.operate_abort(actor, remark) + + def get_audit_info(self) -> Optional[WorkflowAudit]: + """尝试根据 workflow 取出审批工作流""" + if self.audit: + return self.audit + try: + self.audit = WorkflowAudit.objects.get( + workflow_type=self.workflow_type, + workflow_id=getattr(self.workflow, self.workflow_pk_field), ) + if self.audit.workflow_type == WorkflowType.ARCHIVE: + self.resource_group = self.audit.group_name + self.resource_group_id = self.audit.group_id + return self.audit + except ObjectDoesNotExist: + return None + + def operate_pass(self, actor: Users, remark: str) -> WorkflowAuditDetail: + # 判断是否还有下一级审核 + if self.audit.next_audit == "-1": + # 无下一级, 更新主表审核状态为审核通过 + self.audit.current_audit = "-1" + self.audit.current_status = WorkflowStatus.PASSED + self.audit.save() else: - result["msg"] = "审核异常" - raise Exception(result["msg"]) + # 更新主表审核下级审核组和当前审核组 + self.audit.current_status = WorkflowStatus.WAITING + self.audit.current_audit = self.audit.next_audit + # 判断后续是否还有下下一级审核组 + audit_auth_groups_list = self.audit.audit_auth_groups.split(",") + try: + position = audit_auth_groups_list.index(str(self.audit.current_audit)) + except ValueError as e: + logger.error( + f"审批流配置错误, 审批节点{self.audit.current_audit} 不在审批流内: 审核ID {self.audit.audit_id}" + ) + raise e + if position + 1 >= len(audit_auth_groups_list): + # 最后一个节点 + self.audit.next_audit = "-1" + else: + self.audit.next_audit = audit_auth_groups_list[position + 1] + self.audit.save() + + # 插入审核明细数据 + audit_detail_result = WorkflowAuditDetail.objects.create( + audit_id=self.audit.audit_id, + audit_user=actor.username, + audit_status=WorkflowStatus.PASSED, + audit_time=timezone.now(), + remark=remark, + ) + + if self.audit.current_audit == "-1": + operation_info = f"审批备注: {remark}, 无下级审批" + else: + operation_info = f"审批备注:{remark}, 下级审批:{self.audit.current_audit}" + + # 增加工单日志 + WorkflowLog.objects.create( + audit_id=self.audit.audit_id, + operation_type=WorkflowAction.PASS, + operation_type_desc=WorkflowAction.PASS.label, + operation_info=operation_info, + operator=actor.username, + operator_display=actor.display, + ) + return audit_detail_result + + def operate_reject(self, actor: Users, remark: str) -> WorkflowAuditDetail: + # 更新主表审核状态 + self.audit.current_audit = "-1" + self.audit.next_audit = "-1" + self.audit.current_status = WorkflowStatus.REJECTED + self.audit.save() + # 插入审核明细数据 + workflow_audit_detail = WorkflowAuditDetail.objects.create( + audit_id=self.audit.audit_id, + audit_user=actor.username, + audit_status=WorkflowStatus.REJECTED, + audit_time=timezone.now(), + remark=remark, + ) + # 增加工单日志 + WorkflowLog.objects.create( + audit_id=self.audit.audit_id, + operation_type=2, + operation_type_desc="审批不通过", + operation_info="审批备注:{}".format(remark), + operator=actor.username, + operator_display=actor.display, + ) - # 返回审核结果 - result["data"] = {"workflow_status": audit_result.current_status} - return result, audit_detail_result + return workflow_audit_detail + + def operate_abort(self, actor: Users, remark: str) -> WorkflowAuditDetail: + # 更新主表审核状态 + + self.audit.next_audit = "-1" + self.audit.current_status = WorkflowStatus.ABORTED + self.audit.save() + + # 插入审核明细数据 + workflow_audit_detail = WorkflowAuditDetail.objects.create( + audit_id=self.audit.audit_id, + audit_user=actor.username, + audit_status=WorkflowStatus.ABORTED, + audit_time=timezone.now(), + remark=remark, + ) + # 增加工单日志 + WorkflowLog.objects.create( + audit_id=self.audit.audit_id, + operation_type=3, + operation_type_desc="审批取消", + operation_info="取消原因:{}".format(remark), + operator=actor.username, + operator_display=actor.display, + ) + return workflow_audit_detail + + +class Audit(object): + """老版 Audit, 建议不再更新新内容, 转而使用 AuditV2""" # 获取用户待办工单数量 @staticmethod @@ -465,3 +622,28 @@ def add_log( @staticmethod def logs(audit_id): return WorkflowLog.objects.filter(audit_id=audit_id) + + +def get_auditor( + # workflow 对象有可能是还没有在数据库中创建的对象, 这里需要注意 + workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply] = None, + sys_config: SysConfig = None, + audit: WorkflowAudit = None, + workflow_type: WorkflowType = WorkflowType.SQL_REVIEW, + workflow_pk_field: str = "id", + # 归档表中没有下面两个参数, 所以对归档表来说一下两参数必传 + resource_group: str = "", + resource_group_id: int = 0, +) -> AuditV2: + current_auditor = settings.CURRENT_AUDITOR + module, o = current_auditor.split(":") + auditor = getattr(importlib.import_module(module), o) + return auditor( + workflow=workflow, + workflow_type=workflow_type, + workflow_pk_field=workflow_pk_field, + sys_config=sys_config, + audit=audit, + resource_group=resource_group, + resource_group_id=resource_group_id, + ) diff --git a/sql_api/api_workflow.py b/sql_api/api_workflow.py index e8e0e3b785..baf55e7974 100644 --- a/sql_api/api_workflow.py +++ b/sql_api/api_workflow.py @@ -12,7 +12,7 @@ from rest_framework.response import Response from common.config import SysConfig -from common.utils.const import WorkflowStatus, WorkflowType +from common.utils.const import WorkflowStatus, WorkflowType, WorkflowAction from sql.engines import get_engine from sql.models import ( SqlWorkflow, @@ -21,13 +21,14 @@ Users, WorkflowLog, ArchiveConfig, + QueryPrivilegesApply, ) from sql.notify import notify_for_audit, notify_for_execute from sql.query_privileges import _query_apply_audit_call_back from sql.utils.resource_group import user_groups from sql.utils.sql_review import can_cancel, can_execute, on_correct_time_period from sql.utils.tasks import del_schedule -from sql.utils.workflow_audit import Audit +from sql.utils.workflow_audit import Audit, get_auditor, AuditException from .filters import WorkflowFilter, WorkflowAuditFilter from .pagination import CustomizedPagination from .serializers import ( @@ -221,258 +222,79 @@ def post(self, request): serializer = AuditWorkflowSerializer(data=request.data) if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - - audit_type = request.data["audit_type"] - workflow_type = request.data["workflow_type"] - workflow_id = request.data["workflow_id"] - audit_remark = request.data["audit_remark"] - engineer = request.data["engineer"] - user = Users.objects.get(username=engineer) - - # 审核查询权限申请 - if workflow_type == 1: - audit_status = 1 if audit_type == "pass" else 2 - - if audit_remark is None: - audit_remark = "" - - if Audit.can_review(user, workflow_id, workflow_type) is False: - raise serializers.ValidationError({"errors": "你无权操作当前工单!"}) - - # 使用事务保持数据一致性 - try: - with transaction.atomic(): - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowType.QUERY, - ) - audit_id = workflow_audit.audit_id - - # 调用工作流接口审核 - audit_result, audit_detail = Audit.audit( - audit_id, audit_status, user.username, audit_remark - ) - - # 按照审核结果更新业务表审核状态 - workflow_audit = Audit.detail(audit_id) - if workflow_audit.workflow_type == WorkflowType.QUERY: - # 更新业务表审核状态,插入权限信息 - _query_apply_audit_call_back( - workflow_audit.workflow_id, - audit_result["data"]["workflow_status"], - ) - - except Exception as msg: - logger.error(traceback.format_exc()) - raise serializers.ValidationError({"errors": msg}) + # 此处已经通过校验, 肯定存在, 就不 try 了 + workflow_audit = WorkflowAudit.objects.get( + workflow_id=serializer.data["workflow_id"], + workflow_type=serializer.data["workflow_type"], + ) + sys_config = SysConfig() + auditor = get_auditor(audit=workflow_audit) + user = Users.objects.get(username=serializer.data["engineer"]) + if serializer.data["audit_type"] == "pass": + action = WorkflowAction.PASS + notify_config_key = "Pass" + success_message = "passed" + elif serializer.data["audit_type"] == "cancel": + notify_config_key = "Cancel" + success_message = "canceled" + if auditor.workflow.engineer == serializer.data["engineer"]: + action = WorkflowAction.ABORT else: - # 消息通知 - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=audit_detail, - timeout=60, - task_name=f"query-priv-audit-{workflow_id}", - ) - return ( - Response({"msg": "passed"}) - if audit_type == "pass" - else Response({"msg": "canceled"}) - ) - # 审核SQL上线申请 - elif workflow_type == 2: - # SQL上线申请通过 - if audit_type == "pass": - # 权限验证 - if Audit.can_review(user, workflow_id, workflow_type) is False: - raise serializers.ValidationError({"errors": "你无权操作当前工单!"}) - - # 使用事务保持数据一致性 - try: - with transaction.atomic(): - # 调用工作流接口审核 - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowType.SQL_REVIEW, - ) - audit_id = workflow_audit.audit_id - audit_result, audit_detail = Audit.audit( - audit_id, - WorkflowStatus.PASSED, - user.username, - audit_remark, - ) - - # 按照审核结果更新业务表审核状态 - if ( - audit_result["data"]["workflow_status"] - == WorkflowStatus.PASSED - ): - # 将流程状态修改为审核通过 - SqlWorkflow( - id=workflow_id, status="workflow_review_pass" - ).save(update_fields=["status"]) - except Exception as msg: - logger.error(traceback.format_exc()) - raise serializers.ValidationError({"errors": msg}) - else: - # 开启了Pass阶段通知参数才发送消息通知 - sys_config = SysConfig() - is_notified = ( - "Pass" in sys_config.get("notify_phase_control").split(",") - if sys_config.get("notify_phase_control") - else True - ) - if is_notified: - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=audit_detail, - timeout=60, - task_name=f"sqlreview-pass-{workflow_id}", - ) - return Response({"msg": "passed"}) - # SQL上线申请驳回/取消 - elif audit_type == "cancel": - workflow_detail = SqlWorkflow.objects.get(id=workflow_id) - - if audit_remark is None: - raise serializers.ValidationError({"errors": "终止原因不能为空"}) - - if can_cancel(user, workflow_id) is False: - raise serializers.ValidationError({"errors": "你无权操作当前工单!"}) - - # 使用事务保持数据一致性 - try: - with transaction.atomic(): - # 调用工作流接口取消或者驳回 - audit_id = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowType.SQL_REVIEW, - ).audit_id - # 仅待审核的需要调用工作流,审核通过的不需要 - if workflow_detail.status != "workflow_manreviewing": - # 增加工单日志 - if user.username == workflow_detail.engineer: - _, audit_detail = Audit.add_log( - audit_id=audit_id, - operation_type=3, - operation_type_desc="取消执行", - operation_info="取消原因:{}".format(audit_remark), - operator=user.username, - operator_display=user.display, - ) - else: - _, audit_detail = Audit.add_log( - audit_id=audit_id, - operation_type=2, - operation_type_desc="审批不通过", - operation_info="审批备注:{}".format(audit_remark), - operator=user.username, - operator_display=user.display, - ) - else: - if user.username == workflow_detail.engineer: - _, audit_detail = Audit.audit( - audit_id, - WorkflowStatus.ABORTED, - user.username, - audit_remark, - ) - # 非提交人需要校验审核权限 - elif user.has_perm("sql.sql_review"): - _, audit_detail = Audit.audit( - audit_id, - WorkflowStatus.REJECTED, - user.username, - audit_remark, - ) - else: - raise serializers.ValidationError( - {"errors": "Permission Denied"} - ) - - # 删除定时执行task - if workflow_detail.status == "workflow_timingtask": - schedule_name = f"sqlreview-timing-{workflow_id}" - del_schedule(schedule_name) - # 将流程状态修改为人工终止流程 - workflow_detail.status = "workflow_abort" - workflow_detail.save() - except Exception as msg: - logger.error(f"取消工单报错,错误信息:{traceback.format_exc()}") - raise serializers.ValidationError({"errors": msg}) - else: - # 发送取消、驳回通知,开启了Cancel阶段通知参数才发送消息通知 - sys_config = SysConfig() - is_notified = ( - "Cancel" in sys_config.get("notify_phase_control").split(",") - if sys_config.get("notify_phase_control") - else True - ) - if is_notified: - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowType.SQL_REVIEW, - ) - if workflow_audit.current_status in ( - WorkflowStatus.ABORTED, - WorkflowStatus.REJECTED, - ): - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=audit_detail, - timeout=60, - task_name=f"sqlreview-cancel-{workflow_id}", - ) - return Response({"msg": "canceled"}) - # 审核数据归档申请 - elif workflow_type == 3: - audit_status = 1 if audit_type == "pass" else 2 - - if audit_remark is None: - audit_remark = "" - - if Audit.can_review(user, workflow_id, workflow_type) is False: - raise serializers.ValidationError({"errors": "你无权操作当前工单!"}) - - # 使用事务保持数据一致性 - try: - with transaction.atomic(): - workflow_audit = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowType.ARCHIVE, - ) - audit_id = workflow_audit.audit_id + raise serializers.ValidationError({"errors": "用户无权操作此工单"}) + else: + raise serializers.ValidationError( + {"errors": "audit_type 只能是 pass 或 cancel"} + ) - # 调用工作流插入审核信息,更新业务表审核状态 - audit_status, audit_detail = Audit.audit( - audit_id, audit_status, user.username, audit_remark - ) - audit_status = audit_status["data"]["workflow_status"] - ArchiveConfig( - id=workflow_id, - status=audit_status, - state=True if audit_status == WorkflowStatus.PASSED else False, - ).save(update_fields=["status", "state"]) - except Exception as msg: - logger.error(traceback.format_exc()) - raise serializers.ValidationError({"errors": msg}) + try: + workflow_audit_detail = auditor.operate( + action, user, serializer.data["audit_remark"] + ) + except AuditException as e: + raise serializers.ValidationError({"errors": f"操作失败, {str(e)}"}) + + # 最后处置一下原本工单的状态 + if auditor.workflow_type == WorkflowType.QUERY: + _query_apply_audit_call_back( + auditor.audit.workflow_id, + auditor.audit.current_status, + ) + elif auditor.workflow_type == WorkflowType.SQL_REVIEW: + if auditor.audit.current_status == WorkflowStatus.PASSED: + auditor.workflow.status = "workflow_review_pass" + auditor.workflow.save(update_fields=["status"]) + elif auditor.audit.current_status in [ + WorkflowStatus.ABORTED, + WorkflowStatus.REJECTED, + ]: + if auditor.workflow.status == "workflow_timingtask": + del_schedule(f"sqlreview-timing-{auditor.workflow.id}") + # 将流程状态修改为人工终止流程 + auditor.workflow.status = "workflow_abort" + auditor.workflow.save(update_fields=["status"]) + elif auditor.workflow_type == WorkflowType.ARCHIVE: + auditor.workflow.status = auditor.audit.current_status + if auditor.audit.current_status == WorkflowStatus.PASSED: + auditor.workflow.state = True else: - # 消息通知 - async_task( - notify_for_audit, - workflow_audit=workflow_audit, - workflow_audit_detail=audit_detail, - timeout=60, - task_name=f"archive-audit-{workflow_id}", - ) - return ( - Response({"msg": "passed"}) - if audit_type == "pass" - else Response({"msg": "canceled"}) - ) + auditor.workflow.state = False + auditor.workflow.save(update_fields=["status", "state"]) + + # 发消息 + is_notified = ( + notify_config_key in sys_config.get("notify_phase_control").split(",") + if sys_config.get("notify_phase_control") + else True + ) + if is_notified: + async_task( + notify_for_audit, + workflow_audit=auditor.audit, + workflow_audit_detail=workflow_audit_detail, + timeout=60, + task_name=f"notify-audit-{auditor.audit}-{WorkflowType(auditor.audit.workflow_type).label}", + ) + return Response({"msg": success_message}) class ExecuteWorkflow(views.APIView): diff --git a/sql_api/serializers.py b/sql_api/serializers.py index 389e00809c..1b87f45615 100644 --- a/sql_api/serializers.py +++ b/sql_api/serializers.py @@ -18,7 +18,7 @@ from django.core.exceptions import ValidationError from django.db import transaction from sql.engines import get_engine -from sql.utils.workflow_audit import Audit +from sql.utils.workflow_audit import Audit, get_auditor from sql.utils.resource_group import user_instances from common.utils.const import WorkflowType from common.config import SysConfig @@ -413,21 +413,18 @@ def create(self, validated_data): engineer=user.username, engineer_display=user.display, group_name=group.group_name, - audit_auth_groups=Audit.settings( - workflow_data["group_id"], WorkflowType.SQL_REVIEW - ), + audit_auth_groups="", ) try: with transaction.atomic(): - workflow = SqlWorkflow.objects.create(**workflow_data) + workflow = SqlWorkflow(**workflow_data) validated_data["review_content"] = check_result.json() + # 自动创建工作流 + auditor = get_auditor(workflow=workflow) + auditor.create_audit() workflow_content = SqlWorkflowContent.objects.create( workflow=workflow, **validated_data ) - # 自动审核通过了,才调用工作流 - if workflow_status == "workflow_manreviewing": - # 调用工作流插入审核信息, SQL上线权限申请workflow_type=2 - Audit.add(WorkflowType.SQL_REVIEW, workflow.id) except Exception as e: logger.error(f"提交工单报错,错误信息:{traceback.format_exc()}") raise serializers.ValidationError({"errors": str(e)}) @@ -451,7 +448,7 @@ class AuditWorkflowSerializer(serializers.Serializer): workflow_id = serializers.IntegerField(label="工单id") audit_remark = serializers.CharField(label="审批备注") workflow_type = serializers.ChoiceField( - choices=[1, 2, 3], label="工单类型:1-查询权限申请,2-SQL上线申请,3-数据归档申请" + choices=WorkflowType.choices, label="工单类型:1-查询权限申请,2-SQL上线申请,3-数据归档申请" ) audit_type = serializers.ChoiceField(choices=["pass", "cancel"], label="审核类型") diff --git a/sql_api/tests.py b/sql_api/tests.py index f096c7fcea..995f940605 100644 --- a/sql_api/tests.py +++ b/sql_api/tests.py @@ -410,6 +410,8 @@ def setUp(self): current_audit="1", next_audit="-1", current_status=0, + create_user=self.user.username, + create_user_display=self.user.display, ) self.wl = WorkflowLog.objects.create( audit_id=self.audit1.audit_id, operation_type=1