diff --git a/archery/settings.py b/archery/settings.py index dfc3d3768c..9e64b930e4 100644 --- a/archery/settings.py +++ b/archery/settings.py @@ -62,6 +62,7 @@ "sql.notify:FeishuWebhookNotifier", "sql.notify:FeishuPersonNotifier", "sql.notify:QywxWebhookNotifier", + "sql.notify:QywxToUserNotifier", "sql.notify:MailNotifier", "sql.notify:GenericWebhookNotifier", ], diff --git a/conftest.py b/conftest.py index 1b16364e55..00c2ea42a7 100644 --- a/conftest.py +++ b/conftest.py @@ -13,6 +13,7 @@ QueryPrivilegesApply, ArchiveConfig, InstanceTag, + WorkflowAudit, ) from common.config import SysConfig from sql.utils.workflow_audit import AuditV2, AuditSetting @@ -157,3 +158,35 @@ def instance_tag(db): tag = InstanceTag.objects.create(tag_code="test_tag", tag_name="测试标签") yield tag tag.delete() + + +@pytest.fixture +def create_resource_group(db): + resource_group = ResourceGroup.objects.create( + group_name="group_name", + is_deleted=False, + qywx_webhook="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx", + feishu_webhook="https://open.feishu.cn/open-apis/bot/v2/hook/xxx", + ding_webhook="https://oapi.dingtalk.com/robot/send?access_token=xxx", + ) + yield resource_group + resource_group.delete() + + +@pytest.fixture +def create_audit_workflow(normal_user, create_resource_group): + audit_wf = WorkflowAudit.objects.create( + group_id=create_resource_group.group_id, + group_name=create_resource_group.group_name, + workflow_id=1, + workflow_type=2, + workflow_title="申请标题", + workflow_remark="申请备注", + audit_auth_groups="1", + current_audit="1", + next_audit="2", + current_status=0, + create_user=normal_user.username, + ) + yield audit_wf + audit_wf.delete() diff --git a/sql/notify.py b/sql/notify.py index 92c2a56d88..c68618ca52 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -448,6 +448,20 @@ def send(self): ) +class QywxToUserNotifier(LegacyRender): + name = "qywx_to_user" + sys_config_key: str = "wx" + + def send(self): + msg_sender = MsgSender() + for m in self.messages: + msg_to_wx_user = [ + user.wx_user_id if user.wx_user_id else user.username + for user in chain(m.msg_to, m.msg_cc) + ] + msg_sender.send_wx2user(f"{m.msg_title}\n{m.msg_content}", msg_to_wx_user) + + class MailNotifier(LegacyRender): name = "mail" sys_config_key = "mail" diff --git a/sql/test_notify.py b/sql/test_notify.py index 3d5bb6b648..8cc9412595 100644 --- a/sql/test_notify.py +++ b/sql/test_notify.py @@ -1,6 +1,8 @@ import json -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date from unittest.mock import patch, Mock, ANY +import pytest +from pytest_mock import MockFixture from django.contrib.auth.models import Group from django.contrib.auth import get_user_model @@ -28,6 +30,7 @@ FeishuPersonNotifier, FeishuWebhookNotifier, QywxWebhookNotifier, + QywxToUserNotifier, LegacyMessage, Notifier, notify_for_execute, @@ -58,7 +61,7 @@ def setUp(self): ) self.su.groups.add(self.aug) - tomorrow = datetime.today() + timedelta(days=1) + tomorrow = date.today() + timedelta(days=1) self.ins = Instance.objects.create( instance_name="some_ins", type="slave", @@ -193,6 +196,11 @@ def test_base_notifier(self): n = Notifier(workflow=self.wf, sys_config=self.sys_config) n.sys_config_key = "foo" self.assertTrue(n.should_run()) + with self.assertRaises(NotImplementedError): + n.run() + n.send = Mock() + n.render = Mock() + n.run() n.sys_config_key = "not-foo" self.assertFalse(n.should_run()) @@ -414,6 +422,7 @@ def test_legacy_render_m2sql(self): self.assertEqual(notifier.messages[0].msg_title, "[Archery 通知]My2SQL执行失败") def test_general_webhook(self): + # SQL 上线工单 notifier = GenericWebhookNotifier( workflow=self.wf, event_type=EventType.AUDIT, @@ -467,121 +476,56 @@ def test_general_webhook(self): self.assertEqual( notifier.request_data["instance"]["instance_name"], self.ins.instance_name ) - - -class TestNotifySend(TestCase): - audit_wf: WorkflowAudit = None - rs: ResourceGroup = None - user: User = None - - @classmethod - def setUpClass(cls): - cls.user = User.objects.create( - username="test", - email="test@example.com", - ding_user_id="1234", - wx_user_id="1234", - feishu_open_id="1234", - ) - cls.rs = ResourceGroup.objects.create( - group_name="test", - ding_webhook="ding_url", - feishu_webhook="feishu_url", - qywx_webhook="qywx_url", + # SQL 查询工单 + notifier = GenericWebhookNotifier( + workflow=self.query_apply_1, + event_type=EventType.AUDIT, + audit=self.audit_query, + audit_detail=self.audit_query_detail, + sys_config=self.sys_config, ) - cls.audit_wf = WorkflowAudit.objects.create( - group_id=cls.rs.group_id, - group_name="some_group", - workflow_id=1, - workflow_type=2, - workflow_title="申请标题", - workflow_remark="申请备注", - audit_auth_groups="1", - current_audit="1", - next_audit="2", - current_status=0, - create_user=cls.user.username, + notifier.render() + self.assertIsNotNone(notifier.request_data) + self.assertEqual( + notifier.request_data["workflow_content"]["title"], self.query_apply_1.title ) - @classmethod - def tearDownClass(cls): - cls.user.delete() - cls.rs.delete() - cls.audit_wf.delete() - - def setUp(self): - self.patcher = patch("sql.notify.MsgSender") - self.mock_msg_sender = self.patcher.start() - self.get_workflow_patcher = patch("sql.models.WorkflowAudit.get_workflow") - self.mock_get_workflow = self.get_workflow_patcher.start() - self.sys_config = SysConfig() - def tearDown(self): - self.patcher.stop() - self.get_workflow_patcher.stop() - - def generate_notifier(self, module) -> Notifier: - return module(workflow=None, audit=self.audit_wf, sys_config=self.sys_config) - - def test_ding_webhook_send(self): - mocker = Mock() - setattr(self.mock_msg_sender.return_value, "send_ding", mocker) - notifier = self.generate_notifier(DingdingWebhookNotifier) - notifier.messages = [ - LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") - ] - notifier.send() - mocker.assert_called_once() - - def test_ding_person_send(self): - mocker = Mock() - setattr(self.mock_msg_sender.return_value, "send_ding2user", mocker) - notifier = self.generate_notifier(DingdingPersonNotifier) - notifier.messages = [ - LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") - ] - notifier.send() - mocker.assert_called_once() - - def test_feishu_webhook(self): - mocker = Mock() - setattr(self.mock_msg_sender.return_value, "send_feishu_webhook", mocker) - notifier = self.generate_notifier(FeishuWebhookNotifier) - notifier.messages = [ - LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") - ] - notifier.send() - mocker.assert_called_once() - - def test_feishu_person(self): - mocker = Mock() - setattr(self.mock_msg_sender.return_value, "send_feishu_user", mocker) - notifier = self.generate_notifier(FeishuPersonNotifier) - notifier.messages = [ - LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") - ] - notifier.send() - mocker.assert_called_once() - - def test_qywx_webhook(self): - mocker = Mock() - setattr(self.mock_msg_sender.return_value, "send_qywx_webhook", mocker) - notifier = self.generate_notifier(QywxWebhookNotifier) - notifier.messages = [ - LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") - ] - notifier.send() - mocker.assert_called_once() - - def test_mail(self): - mocker = Mock() - setattr(self.mock_msg_sender.return_value, "send_email", mocker) - notifier = self.generate_notifier(MailNotifier) - notifier.messages = [ - LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") - ] - notifier.send() - mocker.assert_called_once() +@pytest.mark.parametrize( + "notifier_to_test,method_assert_called", + [ + (DingdingWebhookNotifier, "send_ding"), + (DingdingPersonNotifier, "send_ding2user"), + (FeishuWebhookNotifier, "send_feishu_webhook"), + (FeishuPersonNotifier, "send_feishu_user"), + (QywxWebhookNotifier, "send_qywx_webhook"), + (QywxToUserNotifier, "send_wx2user"), + (MailNotifier, "send_email"), + ], +) +def test_notify_send( + mocker: MockFixture, + create_audit_workflow, + notifier_to_test: Notifier.__class__, + method_assert_called: str, +): + """测试通知发送 + 初始化 notifier_to_test, 然后调用 send 方法, 然后断言对应的方法`method_assert_called`被调用了 + send 方法都是 MsgSender 的方法, 所以这里只需要断言 MsgSender 的方法被调用了, 如果没有用到 MsgSender 的方法, 那么就不需要这个测试 + 需要自己写别的测试 + """ + mock_send_method = Mock() + mock_msg_sender = mocker.patch("sql.notify.MsgSender") + mocker.patch("sql.models.WorkflowAudit.get_workflow") + setattr(mock_msg_sender.return_value, method_assert_called, mock_send_method) + notifier = notifier_to_test( + workflow=None, audit=create_audit_workflow, sys_config=SysConfig() + ) + notifier.messages = [ + LegacyMessage(msg_to=[Mock()], msg_title="test", msg_content="test") + ] + notifier.send() + mock_send_method.assert_called_once() def test_override_sys_key():