Files
2026-06-05 15:36:31 +08:00

346 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""用户域负向测试:限流、越权、字段校验。"""
from unittest.mock import patch
from django.core.cache import cache
from rest_framework.test import APIClient
from apps.user.models import User
from apps.user.throttling import SmsPhoneMinuteThrottle, RegisterIpThrottle
from .conftest import (
CacheTestCase,
USER_SEND_CODE_URL, USER_REGISTER_URL, USER_LOGIN_URL,
USER_RESET_PWD_URL, USER_CHANGE_PWD_URL, USER_ME_URL,
USER_LOGOUT_URL, USER_REFRESH_URL,
USER_LIST_URL, user_detail_url,
DEFAULT_INSTITUTION_CODE, DEFAULT_INSTITUTION_NAME,
inject_sms_code, create_test_user, get_auth_client, get_tokens,
create_teacher_student_relation, ensure_institution,
)
class UserNegativeTest(CacheTestCase):
"""用户域负向路径测试。"""
def setUp(self):
super().setUp()
self.client = APIClient()
# ── 限流 ─────────────────────────────────────────────────────────────
def test_rate_limit_sms_429(self):
"""N1: SMS 限流 → 429"""
with (
patch.object(SmsPhoneMinuteThrottle, 'allow_request', return_value=False),
patch.object(SmsPhoneMinuteThrottle, 'wait', return_value=60),
):
resp = self.client.post(USER_SEND_CODE_URL, {
'phone': '13800001001', 'scene': 'register',
})
self.assertEqual(resp.status_code, 429, resp.content)
self.assertEqual(resp.json()['code'], 'SYS_RATE_LIMIT')
# ── 越权 ─────────────────────────────────────────────────────────────
def test_unauth_change_password_401(self):
"""N2: 未登录 POST change-password → 401"""
resp = self.client.post(USER_CHANGE_PWD_URL, {
'old_password': 'x', 'new_password': 'y',
})
self.assertEqual(resp.status_code, 401, resp.content)
def test_unauth_me_401(self):
"""N3: 未登录 GET /me → 401"""
resp = self.client.get(USER_ME_URL)
self.assertEqual(resp.status_code, 401, resp.content)
# ── 字段校验 ─────────────────────────────────────────────────────────
def _admin_client(self, phone='13800000001'):
"""返回携带超级管理员 JWT 的客户端(U2 代注册需管理员身份)。"""
admin = create_test_user(phone=phone, password='Admin123', role_type='super_admin')
return get_auth_client(admin)
def test_register_invalid_phone_400(self):
"""N4: 手机号格式不合法 → 400 SMS_INVALID_PHONE"""
client = self._admin_client()
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': '123',
'real_name': '测试',
'institution_code': DEFAULT_INSTITUTION_CODE,
'institution_name': DEFAULT_INSTITUTION_NAME,
})
self.assertEqual(resp.status_code, 400, resp.content)
self.assertEqual(resp.json()['code'], 'SMS_INVALID_PHONE')
def test_register_missing_institution_400(self):
"""N5: 注册缺少机构编码 → 400 USER_INSTITUTION_CODE_REQUIRED"""
phone = '13800001002'
client = self._admin_client()
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': phone,
'real_name': '测试缺机构',
'role_type': 'student',
})
self.assertEqual(resp.status_code, 400, resp.content)
self.assertEqual(resp.json()['code'], 'USER_INSTITUTION_CODE_REQUIRED')
def test_register_duplicate_phone_400(self):
"""N6: 已注册手机号再注册 → 400 AUTH_PHONE_REGISTERED"""
phone = '13800001003'
create_test_user(phone=phone)
client = self._admin_client()
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': phone,
'real_name': '重复注册',
'institution_code': DEFAULT_INSTITUTION_CODE,
'institution_name': DEFAULT_INSTITUTION_NAME,
})
self.assertEqual(resp.status_code, 400, resp.content)
self.assertEqual(resp.json()['code'], 'AUTH_PHONE_REGISTERED')
# ── 代注册权限 ───────────────────────────────────────────────────────
def test_register_unauth_401(self):
"""N-REG1: 未登录调用代注册 → 401"""
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = self.client.post(USER_REGISTER_URL, {
'phone': '13800001007', 'real_name': '匿名',
'institution_code': DEFAULT_INSTITUTION_CODE,
'institution_name': DEFAULT_INSTITUTION_NAME,
})
self.assertEqual(resp.status_code, 401, resp.content)
def test_register_non_admin_403(self):
"""N-REG2: 非管理员(doctor)调用代注册 → 403 USER_NO_REGISTER_PERMISSION"""
doctor = create_test_user(phone='13800001008', password='Doc12345', role_type='doctor')
client = get_auth_client(doctor)
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': '13800001009', 'real_name': '被注册',
'role_type': 'student',
'institution_code': DEFAULT_INSTITUTION_CODE,
'institution_name': DEFAULT_INSTITUTION_NAME,
})
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_REGISTER_PERMISSION')
def test_register_hospital_admin_cannot_create_super_admin_403(self):
"""N-REG3: 医院管理员创建超级管理员 → 403 USER_NO_REGISTER_ROLE_PERMISSION"""
hadmin = create_test_user(phone='13800001010', password='Hosp1234', role_type='hospital_admin')
client = get_auth_client(hadmin)
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': '13800001011', 'real_name': '超管',
'role_type': 'super_admin',
'institution_code': DEFAULT_INSTITUTION_CODE,
'institution_name': DEFAULT_INSTITUTION_NAME,
})
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_REGISTER_ROLE_PERMISSION')
def test_register_hospital_admin_creates_student_ok(self):
"""N-REG4: 医院管理员在本机构创建学生 → 201(新用户机构=管理员机构)"""
inst = ensure_institution(name=DEFAULT_INSTITUTION_NAME, code=DEFAULT_INSTITUTION_CODE)
hadmin = create_test_user(phone='13800001012', password='Hosp1234',
role_type='hospital_admin', institution=inst)
client = get_auth_client(hadmin)
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': '13800001013', 'real_name': '新学生',
'role_type': 'student',
'institution_code': DEFAULT_INSTITUTION_CODE,
'institution_name': DEFAULT_INSTITUTION_NAME,
})
self.assertEqual(resp.status_code, 201, resp.content)
# 新用户机构应等于医院管理员所属机构
new_user = User.objects.get(phone='13800001013')
self.assertEqual(new_user.institution_id, inst.id)
def test_register_hospital_admin_cross_institution_403(self):
"""N-REG5: 医院管理员跨机构建账号 → 403 USER_INSTITUTION_SCOPE_FORBIDDEN"""
inst_a = ensure_institution(name='医院A', code='HOSP-A')
ensure_institution(name='医院B', code='HOSP-B')
hadmin = create_test_user(phone='13800001014', password='Hosp1234',
role_type='hospital_admin', institution=inst_a)
client = get_auth_client(hadmin)
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': '13800001015', 'real_name': '跨机构学生',
'role_type': 'student',
'institution_code': 'HOSP-B',
'institution_name': '医院B',
})
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_INSTITUTION_SCOPE_FORBIDDEN')
def test_register_hospital_admin_no_institution_403(self):
"""N-REG6: 无所属机构的医院管理员代注册 → 403 USER_NO_REGISTER_INSTITUTION"""
hadmin = create_test_user(phone='13800001016', password='Hosp1234',
role_type='hospital_admin') # institution=None
client = get_auth_client(hadmin)
with patch.object(RegisterIpThrottle, 'allow_request', return_value=True):
resp = client.post(USER_REGISTER_URL, {
'phone': '13800001017', 'real_name': '无机构学生',
'role_type': 'student',
'institution_code': DEFAULT_INSTITUTION_CODE,
'institution_name': DEFAULT_INSTITUTION_NAME,
})
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_REGISTER_INSTITUTION')
def test_login_wrong_password(self):
"""N7: 错误密码 → 400 AUTH_BAD_CREDENTIALS"""
phone = '13800001004'
create_test_user(phone=phone, password='RealPass1', role_type='doctor')
resp = self.client.post(USER_LOGIN_URL, {
'account': phone, 'password': 'WrongPass1', 'role': 'doctor',
})
self.assertIn(resp.status_code, (400, 401))
self.assertEqual(resp.json()['code'], 'AUTH_BAD_CREDENTIALS')
def test_login_account_lock_423(self):
"""N8: 连续 5 次错误后第 6 次 → 423 AUTH_ACCOUNT_LOCKED"""
phone = '13800001005'
create_test_user(phone=phone, password='RealPass1', role_type='doctor')
# 连续 5 次错误密码
for _ in range(5):
self.client.post(USER_LOGIN_URL, {
'account': phone, 'password': 'Wrong!!!!', 'role': 'doctor',
})
# 第 6 次
resp = self.client.post(USER_LOGIN_URL, {
'account': phone, 'password': 'Wrong!!!!', 'role': 'doctor',
})
self.assertEqual(resp.status_code, 423, resp.content)
self.assertEqual(resp.json()['code'], 'AUTH_ACCOUNT_LOCKED')
def test_reset_wrong_code(self):
"""N9: 重置密码验证码不匹配 → AUTH_CODE_MISMATCH"""
phone = '13800001006'
create_test_user(phone=phone, password='OldPass1')
inject_sms_code(phone, 'reset', code='123456')
from apps.user.throttling import ResetPhoneThrottle
with patch.object(ResetPhoneThrottle, 'allow_request', return_value=True):
resp = self.client.post(USER_RESET_PWD_URL, {
'phone': phone,
'code': '999999', # 错误验证码
'new_password': 'NewPass1',
})
# AUTH_CODE_MISMATCH raises with default status_code or explicit status
self.assertIn(resp.status_code, (400, 401))
self.assertEqual(resp.json()['code'], 'AUTH_CODE_MISMATCH')
def test_refresh_revoked_token_401(self):
"""N10: logout 后用旧 refresh → 401"""
phone = '13800001007'
user = create_test_user(phone=phone)
tokens = get_tokens(user)
# logout(吊销 refresh
self.client.post(USER_LOGOUT_URL, {'refresh': tokens['refresh']})
# 尝试用已吊销的 refresh 刷新
resp = self.client.post(USER_REFRESH_URL, {'refresh': tokens['refresh']})
self.assertEqual(resp.status_code, 401, resp.content)
self.assertEqual(resp.json()['code'], 'AUTH_TOKEN_INVALID')
class UserListDetailNegativeTest(CacheTestCase):
"""U9 用户列表 + U10 用户详情 负向测试。"""
def setUp(self):
super().setUp()
self.client = APIClient()
# ── U9 列表权限 ──────────────────────────────────────────────────────
def test_student_list_403(self):
"""N11: student GET /users/ → 403 USER_NO_LIST_PERMISSION"""
student = create_test_user(
phone='13800002001', password='Stu12345',
real_name='学生', role_type='student',
)
client = get_auth_client(student)
resp = client.get(USER_LIST_URL)
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_LIST_PERMISSION')
def test_doctor_list_403(self):
"""N12: doctor GET /users/ → 403 USER_NO_LIST_PERMISSION"""
doctor = create_test_user(
phone='13800002002', password='Doc12345',
real_name='医生', role_type='doctor',
)
client = get_auth_client(doctor)
resp = client.get(USER_LIST_URL)
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_LIST_PERMISSION')
def test_unauth_list_401(self):
"""N13: 未登录 GET /users/ → 401"""
resp = self.client.get(USER_LIST_URL)
self.assertEqual(resp.status_code, 401, resp.content)
# ── U10 详情权限 ─────────────────────────────────────────────────────
def test_unauth_detail_401(self):
"""N14: 未登录 GET /users/{id}/ → 401"""
user = create_test_user(phone='13800002010', password='Pass1234')
resp = self.client.get(user_detail_url(user.id))
self.assertEqual(resp.status_code, 401, resp.content)
def test_student_view_other_student_403(self):
"""N15: student A 查看 student B 详情 → 403 USER_NO_VIEW_PERMISSION"""
stu_a = create_test_user(
phone='13800002020', password='Stu12345',
real_name='学生A', role_type='student',
)
stu_b = create_test_user(
phone='13800002021', password='Stu12345',
real_name='学生B', role_type='student',
)
client = get_auth_client(stu_a)
resp = client.get(user_detail_url(stu_b.id))
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_VIEW_PERMISSION')
def test_teacher_view_unrelated_student_403(self):
"""N16: teacher 查看非名下学生详情 → 403 USER_NO_VIEW_PERMISSION"""
teacher = create_test_user(
phone='13800002030', password='Teacher1',
real_name='刘老师', role_type='teacher',
)
unrelated = create_test_user(
phone='13800002031', password='Stu12345',
real_name='非名下学生', role_type='student',
)
# 无师生关系
client = get_auth_client(teacher)
resp = client.get(user_detail_url(unrelated.id))
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_VIEW_PERMISSION')
def test_teacher_view_ended_relation_student_403(self):
"""N17: teacher 查看已结束关系学生详情 → 403 USER_NO_VIEW_PERMISSION"""
teacher = create_test_user(
phone='13800002040', password='Teacher1',
real_name='陈老师', role_type='teacher',
)
student = create_test_user(
phone='13800002041', password='Stu12345',
real_name='已毕业学生', role_type='student',
)
create_teacher_student_relation(teacher, student, status=0) # 已结束
client = get_auth_client(teacher)
resp = client.get(user_detail_url(student.id))
self.assertEqual(resp.status_code, 403, resp.content)
self.assertEqual(resp.json()['code'], 'USER_NO_VIEW_PERMISSION')