import json
from datetime import datetime, timedelta, date
from unittest.mock import patch, Mock, ANY
import pytest
from pytest_mock import MockFixture
from django.contrib.auth.models import Group
from django.contrib.auth import get_user_model
from django.test import TestCase
from common.config import SysConfig
from sql.models import (
Instance,
SqlWorkflow,
SqlWorkflowContent,
QueryPrivilegesApply,
WorkflowAudit,
WorkflowAuditDetail,
ResourceGroup,
ArchiveConfig,
)
from sql.notify import (
auto_notify,
EventType,
LegacyRender,
GenericWebhookNotifier,
My2SqlResult,
DingdingWebhookNotifier,
DingdingPersonNotifier,
FeishuPersonNotifier,
FeishuWebhookNotifier,
QywxWebhookNotifier,
QywxToUserNotifier,
LegacyMessage,
Notifier,
notify_for_execute,
notify_for_audit,
notify_for_my2sql,
MailNotifier,
)
User = get_user_model()
class TestNotify(TestCase):
"""
测试消息
"""
def setUp(self):
self.sys_config = SysConfig()
self.aug = Group.objects.create(id=1, name="auth_group")
self.user = User.objects.create(
username="test_user", display="中文显示", is_active=True
)
self.su = User.objects.create(
username="s_user",
display="中文显示",
is_active=True,
is_superuser=True,
)
self.su.groups.add(self.aug)
tomorrow = date.today() + timedelta(days=1)
self.ins = Instance.objects.create(
instance_name="some_ins",
type="slave",
db_type="mysql",
host="some_host",
port=3306,
user="ins_user",
password="some_str",
)
self.wf = SqlWorkflow.objects.create(
workflow_name="some_name",
group_id=1,
group_name="g1",
engineer=self.user.username,
engineer_display=self.user.display,
audit_auth_groups="some_audit_group",
create_time=datetime.now(),
status="workflow_timingtask",
is_backup=True,
instance=self.ins,
db_name="some_db",
syntax_type=1,
)
SqlWorkflowContent.objects.create(
workflow=self.wf, sql_content="some_sql", execute_result=""
)
self.query_apply_1 = QueryPrivilegesApply.objects.create(
group_id=1,
group_name="some_name",
title="some_title1",
user_name="some_user",
instance=self.ins,
db_list="some_db,some_db2",
limit_num=100,
valid_date=tomorrow,
priv_type=1,
status=0,
audit_auth_groups="some_audit_group",
)
# 必须要有的几个
# WorkflowAudit, 审核表, 每一个工作流关联一条记录
# WorkflowAuditDetail, 审核详情, 每一个审核步骤一条记录, 并且都关联到一个 WorkflowAudit
self.audit_wf = WorkflowAudit.objects.create(
group_id=1,
group_name="some_group",
workflow_id=self.wf.id,
workflow_type=2,
workflow_title="申请标题",
workflow_remark="申请备注",
audit_auth_groups="1",
current_audit="1",
next_audit="2",
current_status=0,
create_user=self.user.username,
)
self.audit_wf_detail = WorkflowAuditDetail.objects.create(
audit_id=self.audit_wf.audit_id,
audit_user=self.user.display,
audit_time=datetime.now(),
audit_status=1,
remark="测试备注",
)
self.audit_query = WorkflowAudit.objects.create(
group_id=1,
group_name="some_group",
workflow_id=self.query_apply_1.apply_id,
workflow_type=1,
workflow_title="申请标题",
workflow_remark="申请备注",
audit_auth_groups=",".join([str(self.aug.id)]),
current_audit=str(self.aug.id),
next_audit="-1",
current_status=0,
)
self.audit_query_detail = WorkflowAuditDetail.objects.create(
audit_id=self.audit_query.audit_id,
audit_user=self.user.display,
audit_time=datetime.now(),
audit_status=1,
remark="测试query备注",
)
self.rs = ResourceGroup.objects.create(group_id=1, ding_webhook="url")
self.archive_apply = ArchiveConfig.objects.create(
title="测试归档",
resource_group=self.rs,
src_instance=self.ins,
src_db_name="foo",
src_table_name="bar",
dest_db_name="foo-dest",
dest_table_name="bar-dest",
mode="purge",
no_delete=False,
status=0,
user_name=self.user.username,
user_display=self.user.display,
)
self.archive_apply_audit = WorkflowAudit.objects.create(
group_id=1,
group_name="some_group",
workflow_id=self.archive_apply.id,
workflow_type=3,
workflow_title=self.archive_apply.title,
workflow_remark="申请备注",
audit_auth_groups=",".join([str(self.aug.id)]),
current_audit=str(self.aug.id),
next_audit="-1",
current_status=0,
)
def tearDown(self):
self.sys_config.purge()
User.objects.all().delete()
SqlWorkflow.objects.all().delete()
SqlWorkflowContent.objects.all().delete()
WorkflowAudit.objects.all().delete()
WorkflowAuditDetail.objects.all().delete()
ArchiveConfig.objects.all().delete()
ResourceGroup.objects.all().delete()
def test_empty_notifiers(self):
with self.settings(ENABLED_NOTIFIERS=()):
auto_notify(
workflow=self.wf,
event_type=EventType.EXECUTE,
sys_config=self.sys_config,
)
def test_base_notifier(self):
self.sys_config.set("foo", "bar")
n = Notifier(workflow=self.wf, sys_config=self.sys_config)
n.sys_config_key = "foo"
self.assertTrue(n.should_run())
with self.assertRaises(NotImplementedError):
n.run()
n.send = Mock()
n.render = Mock()
n.run()
n.sys_config_key = "not-foo"
self.assertFalse(n.should_run())
def test_no_workflow_and_audit(self):
with self.assertRaises(ValueError):
Notifier(workflow=None, audit=None)
@patch("sql.notify.FeishuWebhookNotifier.run")
def test_auto_notify(self, mock_run):
with self.settings(ENABLED_NOTIFIERS=("sql.notify:FeishuWebhookNotifier",)):
auto_notify(self.sys_config, event_type=EventType.EXECUTE, workflow=self.wf)
mock_run.assert_called_once()
@patch("sql.notify.auto_notify")
def test_notify_for_execute(self, mock_auto_notify: Mock):
"""测试适配器"""
notify_for_execute(self.wf)
mock_auto_notify.assert_called_once_with(
workflow=self.wf, sys_config=ANY, event_type=EventType.EXECUTE
)
@patch("sql.notify.auto_notify")
def test_notify_for_audit(self, mock_auto_notify: Mock):
"""测试适配器"""
notify_for_audit(
workflow_audit=self.audit_wf, workflow_audit_detail=self.audit_wf_detail
)
mock_auto_notify.assert_called_once_with(
workflow=None,
event_type=EventType.AUDIT,
sys_config=ANY,
audit=self.audit_wf,
audit_detail=self.audit_wf_detail,
)
@patch("sql.notify.auto_notify")
def test_notify_for_m2sql(self, mock_auto_notify: Mock):
"""测试适配器"""
task = Mock()
task.success = True
task.kwargs = {"user": "foo"}
task.result = ["", "/foo"]
expect_workflow = My2SqlResult(success=True, submitter="foo", file_path="/foo")
notify_for_my2sql(task)
mock_auto_notify.assert_called_once_with(
workflow=expect_workflow, sys_config=ANY, event_type=EventType.M2SQL
)
mock_auto_notify.reset_mock()
# 测试失败的情况
task.success = False
task.result = "Traceback blahblah"
expect_workflow = My2SqlResult(
success=False, submitter="foo", error=task.result
)
notify_for_my2sql(task)
mock_auto_notify.assert_called_once_with(
workflow=expect_workflow, sys_config=ANY, event_type=EventType.M2SQL
)
# 下面的测试均为 notifier 的测试, 测试 render 和 send
def test_legacy_render_execution(self):
notifier = LegacyRender(
workflow=self.wf, event_type=EventType.EXECUTE, sys_config=self.sys_config
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("工单", notifier.messages[0].msg_title)
with self.assertRaises(NotImplementedError):
notifier.send()
def test_legacy_render_execution_ddl(self):
"""DDL 比普通的工单多一个通知 dba"""
self.wf.syntax_type = 1
self.wf.status = "workflow_finish"
self.wf.save()
self.sys_config.set("ddl_notify_auth_group", self.aug.name)
notifier = LegacyRender(
workflow=self.wf, event_type=EventType.EXECUTE, sys_config=self.sys_config
)
notifier.render()
self.assertEqual(len(notifier.messages), 2)
self.assertIn("有新的DDL语句执行完成", notifier.messages[1].msg_title)
def test_legacy_render_audit(self):
notifier = LegacyRender(
workflow=self.wf,
event_type=EventType.AUDIT,
audit=self.audit_wf,
audit_detail=self.audit_wf_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("新的工单申请", notifier.messages[0].msg_title)
# 测试一下不传 workflow
notifier = LegacyRender(
event_type=EventType.AUDIT,
workflow=None,
audit=self.audit_wf,
audit_detail=self.audit_wf_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("新的工单申请", notifier.messages[0].msg_title)
def test_legacy_render_query_audit(self):
# 默认是库权限的
notifier = LegacyRender(
workflow=self.query_apply_1,
event_type=EventType.AUDIT,
audit=self.audit_query,
audit_detail=self.audit_query_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("数据库清单", notifier.messages[0].msg_content)
# 表级别的权限申请
self.query_apply_1.priv_type = 2
self.query_apply_1.table_list = "foo,bar"
self.query_apply_1.save()
notifier = LegacyRender(
workflow=self.query_apply_1,
event_type=EventType.AUDIT,
audit=self.audit_query,
audit_detail=self.audit_query_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("表清单", notifier.messages[0].msg_content)
self.assertIn("foo,bar", notifier.messages[0].msg_content)
def test_legacy_render_archive_audit(self):
notifier = LegacyRender(
workflow=self.archive_apply,
event_type=EventType.AUDIT,
audit=self.archive_apply_audit,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("归档表", notifier.messages[0].msg_content)
def test_legacy_render_audit_success(self):
"""审核通过消息"""
# 只测试上线工单
self.audit_wf.current_status = 1
self.audit_wf.save()
notifier = LegacyRender(
workflow=self.wf,
event_type=EventType.AUDIT,
audit=self.audit_wf,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("工单审核通过", notifier.messages[0].msg_title)
def test_legacy_render_audit_reject(self):
self.audit_wf.current_status = 2
self.audit_wf.save()
self.audit_wf_detail.remark = "驳回foo-bar"
self.audit_wf_detail.save()
notifier = LegacyRender(
workflow=self.wf,
event_type=EventType.AUDIT,
audit=self.audit_wf,
audit_detail=self.audit_wf_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("工单被驳回", notifier.messages[0].msg_title)
self.assertIn("驳回foo-bar", notifier.messages[0].msg_content)
def test_legacy_render_audit_abort(self):
self.audit_wf.current_status = 3
self.audit_wf.save()
self.audit_wf_detail.remark = "撤回foo-bar"
self.audit_wf_detail.save()
notifier = LegacyRender(
workflow=self.wf,
event_type=EventType.AUDIT,
audit=self.audit_wf,
audit_detail=self.audit_wf_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertIn("提交人主动终止工单", notifier.messages[0].msg_title)
self.assertIn("撤回foo-bar", notifier.messages[0].msg_content)
def test_legacy_render_m2sql(self):
successful_workflow = My2SqlResult(
submitter=self.user.username, success=True, file_path="/foo/bar"
)
notifier = LegacyRender(
workflow=successful_workflow,
sys_config=self.sys_config,
event_type=EventType.M2SQL,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertEqual(notifier.messages[0].msg_title, "[Archery 通知]My2SQL执行结束")
# 失败
failed_workflow = My2SqlResult(
submitter=self.user.username, success=False, error="Traceback blahblah"
)
notifier = LegacyRender(
workflow=failed_workflow,
sys_config=self.sys_config,
event_type=EventType.M2SQL,
)
notifier.render()
self.assertEqual(len(notifier.messages), 1)
self.assertEqual(notifier.messages[0].msg_title, "[Archery 通知]My2SQL执行失败")
def test_general_webhook(self):
# SQL 上线工单
notifier = GenericWebhookNotifier(
workflow=self.wf,
event_type=EventType.AUDIT,
audit=self.audit_wf,
audit_detail=self.audit_wf_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertIsNotNone(notifier.request_data)
print(json.dumps(notifier.request_data))
self.assertDictEqual(
notifier.request_data["audit"],
{
"audit_id": self.audit_wf.audit_id,
"group_name": "some_group",
"workflow_type": 2,
"create_user_display": "",
"workflow_title": "申请标题",
"audit_auth_groups": self.audit_wf.audit_auth_groups,
"current_audit": "1",
"current_status": 0,
"create_time": self.audit_wf.create_time.isoformat(),
},
)
self.assertDictEqual(
notifier.request_data["workflow_content"]["workflow"],
{
"id": self.wf.id,
"workflow_name": "some_name",
"demand_url": "",
"group_id": 1,
"group_name": "g1",
"db_name": "some_db",
"syntax_type": 1,
"is_backup": True,
"engineer": "test_user",
"engineer_display": "中文显示",
"status": "workflow_timingtask",
"audit_auth_groups": "some_audit_group",
"run_date_start": None,
"run_date_end": None,
"finish_time": None,
"is_manual": 0,
"instance": self.ins.id,
"create_time": self.wf.create_time.isoformat(),
"is_offline_export": 0,
"export_format": None,
"file_name": None,
},
)
self.assertEqual(
notifier.request_data["workflow_content"]["sql_content"], "some_sql"
)
self.assertEqual(
notifier.request_data["instance"]["instance_name"], self.ins.instance_name
)
# SQL 查询工单
notifier = GenericWebhookNotifier(
workflow=self.query_apply_1,
event_type=EventType.AUDIT,
audit=self.audit_query,
audit_detail=self.audit_query_detail,
sys_config=self.sys_config,
)
notifier.render()
self.assertIsNotNone(notifier.request_data)
self.assertEqual(
notifier.request_data["workflow_content"]["title"], self.query_apply_1.title
)
@pytest.mark.parametrize(
"notifier_to_test,method_assert_called",
[
(DingdingWebhookNotifier, "send_ding"),
(DingdingPersonNotifier, "send_ding2user"),
(FeishuWebhookNotifier, "send_feishu_webhook"),
(FeishuPersonNotifier, "send_feishu_user"),
(QywxWebhookNotifier, "send_qywx_webhook"),
(QywxToUserNotifier, "send_wx2user"),
(MailNotifier, "send_email"),
],
)
def test_notify_send(
mocker: MockFixture,
create_audit_workflow,
notifier_to_test: Notifier.__class__,
method_assert_called: str,
):
"""测试通知发送
初始化 notifier_to_test, 然后调用 send 方法, 然后断言对应的方法`method_assert_called`被调用了
send 方法都是 MsgSender 的方法, 所以这里只需要断言 MsgSender 的方法被调用了, 如果没有用到 MsgSender 的方法, 那么就不需要这个测试
需要自己写别的测试
"""
mock_send_method = Mock()
mock_msg_sender = mocker.patch("sql.notify.MsgSender")
mocker.patch("sql.models.WorkflowAudit.get_workflow")
setattr(mock_msg_sender.return_value, method_assert_called, mock_send_method)
notifier = notifier_to_test(
workflow=None, audit=create_audit_workflow, sys_config=SysConfig()
)
notifier.messages = [
LegacyMessage(msg_to=[Mock()], msg_title="test", msg_content="test")
]
notifier.send()
mock_send_method.assert_called_once()
def test_override_sys_key():
"""dataclass 的继承有时候让人有点困惑, 在这里补一个测试确认可以正常覆盖一些值"""
class OverrideNotifier(Notifier):
sys_config_key = "test"
n = OverrideNotifier(workflow=Mock())
assert n.sys_config_key == "test"