"""用户域负向测试:限流、越权、字段校验。""" from unittest.mock import patch from django.core.cache import cache from rest_framework.test import APIClient from apps.user.models import User from apps.user.throttling import SmsPhoneMinuteThrottle, RegisterIpThrottle from .conftest import ( CacheTestCase, USER_SEND_CODE_URL, USER_REGISTER_URL, USER_LOGIN_URL, USER_RESET_PWD_URL, USER_CHANGE_PWD_URL, USER_ME_URL, USER_LOGOUT_URL, USER_REFRESH_URL, USER_LIST_URL, user_detail_url, DEFAULT_INSTITUTION_CODE, DEFAULT_INSTITUTION_NAME, inject_sms_code, create_test_user, get_auth_client, get_tokens, create_teacher_student_relation, ensure_institution, ) class UserNegativeTest(CacheTestCase): """用户域负向路径测试。""" def setUp(self): super().setUp() self.client = APIClient() # ── 限流 ───────────────────────────────────────────────────────────── def test_rate_limit_sms_429(self): """N1: SMS 限流 → 429""" with ( patch.object(SmsPhoneMinuteThrottle, 'allow_request', return_value=False), patch.object(SmsPhoneMinuteThrottle, 'wait', return_value=60), ): resp = self.client.post(USER_SEND_CODE_URL, { 'phone': '13800001001', 'scene': 'register', }) self.assertEqual(resp.status_code, 429, resp.content) self.assertEqual(resp.json()['code'], 'SYS_RATE_LIMIT') # ── 越权 ───────────────────────────────────────────────────────────── def test_unauth_change_password_401(self): """N2: 未登录 POST change-password → 401""" resp = self.client.post(USER_CHANGE_PWD_URL, { 'old_password': 'x', 'new_password': 'y', }) self.assertEqual(resp.status_code, 401, resp.content) def test_unauth_me_401(self): """N3: 未登录 GET /me → 401""" resp = self.client.get(USER_ME_URL) self.assertEqual(resp.status_code, 401, resp.content) # ── 字段校验 ───────────────────────────────────────────────────────── def _admin_client(self, phone='13800000001'): """返回携带超级管理员 JWT 的客户端(U2 代注册需管理员身份)。""" admin = create_test_user(phone=phone, password='Admin123', role_type='super_admin') return get_auth_client(admin) def test_register_invalid_phone_400(self): """N4: 手机号格式不合法 → 400 SMS_INVALID_PHONE""" client = self._admin_client() with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': '123', 'real_name': '测试', 'institution_code': DEFAULT_INSTITUTION_CODE, 'institution_name': DEFAULT_INSTITUTION_NAME, }) self.assertEqual(resp.status_code, 400, resp.content) self.assertEqual(resp.json()['code'], 'SMS_INVALID_PHONE') def test_register_missing_institution_400(self): """N5: 注册缺少机构编码 → 400 USER_INSTITUTION_CODE_REQUIRED""" phone = '13800001002' client = self._admin_client() with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': phone, 'real_name': '测试缺机构', 'role_type': 'student', }) self.assertEqual(resp.status_code, 400, resp.content) self.assertEqual(resp.json()['code'], 'USER_INSTITUTION_CODE_REQUIRED') def test_register_duplicate_phone_400(self): """N6: 已注册手机号再注册 → 400 AUTH_PHONE_REGISTERED""" phone = '13800001003' create_test_user(phone=phone) client = self._admin_client() with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': phone, 'real_name': '重复注册', 'institution_code': DEFAULT_INSTITUTION_CODE, 'institution_name': DEFAULT_INSTITUTION_NAME, }) self.assertEqual(resp.status_code, 400, resp.content) self.assertEqual(resp.json()['code'], 'AUTH_PHONE_REGISTERED') # ── 代注册权限 ─────────────────────────────────────────────────────── def test_register_unauth_401(self): """N-REG1: 未登录调用代注册 → 401""" with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = self.client.post(USER_REGISTER_URL, { 'phone': '13800001007', 'real_name': '匿名', 'institution_code': DEFAULT_INSTITUTION_CODE, 'institution_name': DEFAULT_INSTITUTION_NAME, }) self.assertEqual(resp.status_code, 401, resp.content) def test_register_non_admin_403(self): """N-REG2: 非管理员(doctor)调用代注册 → 403 USER_NO_REGISTER_PERMISSION""" doctor = create_test_user(phone='13800001008', password='Doc12345', role_type='doctor') client = get_auth_client(doctor) with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': '13800001009', 'real_name': '被注册', 'role_type': 'student', 'institution_code': DEFAULT_INSTITUTION_CODE, 'institution_name': DEFAULT_INSTITUTION_NAME, }) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_REGISTER_PERMISSION') def test_register_hospital_admin_cannot_create_super_admin_403(self): """N-REG3: 医院管理员创建超级管理员 → 403 USER_NO_REGISTER_ROLE_PERMISSION""" hadmin = create_test_user(phone='13800001010', password='Hosp1234', role_type='hospital_admin') client = get_auth_client(hadmin) with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': '13800001011', 'real_name': '超管', 'role_type': 'super_admin', 'institution_code': DEFAULT_INSTITUTION_CODE, 'institution_name': DEFAULT_INSTITUTION_NAME, }) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_REGISTER_ROLE_PERMISSION') def test_register_hospital_admin_creates_student_ok(self): """N-REG4: 医院管理员在本机构创建学生 → 201(新用户机构=管理员机构)""" inst = ensure_institution(name=DEFAULT_INSTITUTION_NAME, code=DEFAULT_INSTITUTION_CODE) hadmin = create_test_user(phone='13800001012', password='Hosp1234', role_type='hospital_admin', institution=inst) client = get_auth_client(hadmin) with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': '13800001013', 'real_name': '新学生', 'role_type': 'student', 'institution_code': DEFAULT_INSTITUTION_CODE, 'institution_name': DEFAULT_INSTITUTION_NAME, }) self.assertEqual(resp.status_code, 201, resp.content) # 新用户机构应等于医院管理员所属机构 new_user = User.objects.get(phone='13800001013') self.assertEqual(new_user.institution_id, inst.id) def test_register_hospital_admin_cross_institution_403(self): """N-REG5: 医院管理员跨机构建账号 → 403 USER_INSTITUTION_SCOPE_FORBIDDEN""" inst_a = ensure_institution(name='医院A', code='HOSP-A') ensure_institution(name='医院B', code='HOSP-B') hadmin = create_test_user(phone='13800001014', password='Hosp1234', role_type='hospital_admin', institution=inst_a) client = get_auth_client(hadmin) with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': '13800001015', 'real_name': '跨机构学生', 'role_type': 'student', 'institution_code': 'HOSP-B', 'institution_name': '医院B', }) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_INSTITUTION_SCOPE_FORBIDDEN') def test_register_hospital_admin_no_institution_403(self): """N-REG6: 无所属机构的医院管理员代注册 → 403 USER_NO_REGISTER_INSTITUTION""" hadmin = create_test_user(phone='13800001016', password='Hosp1234', role_type='hospital_admin') # institution=None client = get_auth_client(hadmin) with patch.object(RegisterIpThrottle, 'allow_request', return_value=True): resp = client.post(USER_REGISTER_URL, { 'phone': '13800001017', 'real_name': '无机构学生', 'role_type': 'student', 'institution_code': DEFAULT_INSTITUTION_CODE, 'institution_name': DEFAULT_INSTITUTION_NAME, }) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_REGISTER_INSTITUTION') def test_login_wrong_password(self): """N7: 错误密码 → 400 AUTH_BAD_CREDENTIALS""" phone = '13800001004' create_test_user(phone=phone, password='RealPass1', role_type='doctor') resp = self.client.post(USER_LOGIN_URL, { 'account': phone, 'password': 'WrongPass1', 'role': 'doctor', }) self.assertIn(resp.status_code, (400, 401)) self.assertEqual(resp.json()['code'], 'AUTH_BAD_CREDENTIALS') def test_login_account_lock_423(self): """N8: 连续 5 次错误后第 6 次 → 423 AUTH_ACCOUNT_LOCKED""" phone = '13800001005' create_test_user(phone=phone, password='RealPass1', role_type='doctor') # 连续 5 次错误密码 for _ in range(5): self.client.post(USER_LOGIN_URL, { 'account': phone, 'password': 'Wrong!!!!', 'role': 'doctor', }) # 第 6 次 resp = self.client.post(USER_LOGIN_URL, { 'account': phone, 'password': 'Wrong!!!!', 'role': 'doctor', }) self.assertEqual(resp.status_code, 423, resp.content) self.assertEqual(resp.json()['code'], 'AUTH_ACCOUNT_LOCKED') def test_reset_wrong_code(self): """N9: 重置密码验证码不匹配 → AUTH_CODE_MISMATCH""" phone = '13800001006' create_test_user(phone=phone, password='OldPass1') inject_sms_code(phone, 'reset', code='123456') from apps.user.throttling import ResetPhoneThrottle with patch.object(ResetPhoneThrottle, 'allow_request', return_value=True): resp = self.client.post(USER_RESET_PWD_URL, { 'phone': phone, 'code': '999999', # 错误验证码 'new_password': 'NewPass1', }) # AUTH_CODE_MISMATCH raises with default status_code or explicit status self.assertIn(resp.status_code, (400, 401)) self.assertEqual(resp.json()['code'], 'AUTH_CODE_MISMATCH') def test_refresh_revoked_token_401(self): """N10: logout 后用旧 refresh → 401""" phone = '13800001007' user = create_test_user(phone=phone) tokens = get_tokens(user) # logout(吊销 refresh) self.client.post(USER_LOGOUT_URL, {'refresh': tokens['refresh']}) # 尝试用已吊销的 refresh 刷新 resp = self.client.post(USER_REFRESH_URL, {'refresh': tokens['refresh']}) self.assertEqual(resp.status_code, 401, resp.content) self.assertEqual(resp.json()['code'], 'AUTH_TOKEN_INVALID') class UserListDetailNegativeTest(CacheTestCase): """U9 用户列表 + U10 用户详情 负向测试。""" def setUp(self): super().setUp() self.client = APIClient() # ── U9 列表权限 ────────────────────────────────────────────────────── def test_student_list_403(self): """N11: student GET /users/ → 403 USER_NO_LIST_PERMISSION""" student = create_test_user( phone='13800002001', password='Stu12345', real_name='学生', role_type='student', ) client = get_auth_client(student) resp = client.get(USER_LIST_URL) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_LIST_PERMISSION') def test_doctor_list_403(self): """N12: doctor GET /users/ → 403 USER_NO_LIST_PERMISSION""" doctor = create_test_user( phone='13800002002', password='Doc12345', real_name='医生', role_type='doctor', ) client = get_auth_client(doctor) resp = client.get(USER_LIST_URL) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_LIST_PERMISSION') def test_unauth_list_401(self): """N13: 未登录 GET /users/ → 401""" resp = self.client.get(USER_LIST_URL) self.assertEqual(resp.status_code, 401, resp.content) # ── U10 详情权限 ───────────────────────────────────────────────────── def test_unauth_detail_401(self): """N14: 未登录 GET /users/{id}/ → 401""" user = create_test_user(phone='13800002010', password='Pass1234') resp = self.client.get(user_detail_url(user.id)) self.assertEqual(resp.status_code, 401, resp.content) def test_student_view_other_student_403(self): """N15: student A 查看 student B 详情 → 403 USER_NO_VIEW_PERMISSION""" stu_a = create_test_user( phone='13800002020', password='Stu12345', real_name='学生A', role_type='student', ) stu_b = create_test_user( phone='13800002021', password='Stu12345', real_name='学生B', role_type='student', ) client = get_auth_client(stu_a) resp = client.get(user_detail_url(stu_b.id)) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_VIEW_PERMISSION') def test_teacher_view_unrelated_student_403(self): """N16: teacher 查看非名下学生详情 → 403 USER_NO_VIEW_PERMISSION""" teacher = create_test_user( phone='13800002030', password='Teacher1', real_name='刘老师', role_type='teacher', ) unrelated = create_test_user( phone='13800002031', password='Stu12345', real_name='非名下学生', role_type='student', ) # 无师生关系 client = get_auth_client(teacher) resp = client.get(user_detail_url(unrelated.id)) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_VIEW_PERMISSION') def test_teacher_view_ended_relation_student_403(self): """N17: teacher 查看已结束关系学生详情 → 403 USER_NO_VIEW_PERMISSION""" teacher = create_test_user( phone='13800002040', password='Teacher1', real_name='陈老师', role_type='teacher', ) student = create_test_user( phone='13800002041', password='Stu12345', real_name='已毕业学生', role_type='student', ) create_teacher_student_relation(teacher, student, status=0) # 已结束 client = get_auth_client(teacher) resp = client.get(user_detail_url(student.id)) self.assertEqual(resp.status_code, 403, resp.content) self.assertEqual(resp.json()['code'], 'USER_NO_VIEW_PERMISSION')