Files
medical_training/test/swagger_tryout.py
T

716 lines
33 KiB
Python
Raw Normal View History

2026-05-29 15:58:00 +08:00
"""
Swagger Try-it-out 等效脚本:逐个调用所有接口,验证可达性和基本功能。
运行方式:.venv\\Scripts\\python.exe test/swagger_tryout.py
前提:Django dev server 已在 http://127.0.0.1:8000 运行,Redis 已启动;.env 指向目标库。
认证流程(与当前 API 一致):
U1 发码(login) → U2 管理员代注册(机构编码+名称, 默认密码 Pass+手机号)
→ U3 密码登录 → U4 验证码登录(机构信息) → U4-new 新号自动注册
→ U5 重置密码 → U6 改密 → U8 刷新 → /me → U9/U10 → U7 退出
病例段:C1 解析 exam_items → C3 full-create 写入 case_exam_item(校验响应条数与库表条数)
2026-05-29 15:58:00 +08:00
日志输出:logs/test-swagger-YYYY-MM-DD.log(含完整请求体和响应体)
"""
import io
import json
import sys
import time
import subprocess
from datetime import datetime
from pathlib import Path
import requests
# 修复 Windows GBK 编码问题
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
BASE = 'http://127.0.0.1:8000'
PYTHON = r'D:\01Agent\medical_training\.venv\Scripts\python.exe'
CWD = r'D:\01Agent\medical_training'
PASS = 'PASS'
FAIL = 'FAIL'
results = []
# ─── 日志文件 ─────────────────────────────────────────────────────────────────
LOGS_DIR = Path(CWD) / 'logs'
LOGS_DIR.mkdir(exist_ok=True)
LOG_FILE = LOGS_DIR / f'test-swagger-{datetime.now().strftime("%Y-%m-%d")}.log'
_log_fh = open(LOG_FILE, 'a', encoding='utf-8')
def _write_log(text):
"""同时写入日志文件和控制台。"""
_log_fh.write(text + '\n')
_log_fh.flush()
print(text)
def _format_headers(headers):
"""将 requests 的 headers 格式化为 JSON 字符串。"""
if not headers:
return ''
return json.dumps(dict(headers), ensure_ascii=False)
def log(api_id, method, url, expected, actual, detail='',
req_headers=None, req_body=None, resp_headers=None, resp_body=None):
status = PASS if actual in (expected if isinstance(expected, (list, tuple)) else [expected]) else FAIL
results.append((api_id, method, url, expected, actual, status))
exp_str = str(expected)
_write_log(f' {status} {api_id:<6} {method:<6} {url:<50} expect={exp_str:<20} got={actual} {detail}')
# 详细请求/响应写入日志文件(不打印到控制台,避免过长)
if req_headers is not None:
_log_fh.write(f' >>> headers: {_format_headers(req_headers)}\n')
if req_body is not None:
body_str = json.dumps(req_body, ensure_ascii=False, indent=2) if isinstance(req_body, dict) else str(req_body)
_log_fh.write(f' >>> body: {body_str}\n')
if resp_headers is not None:
_log_fh.write(f' <<< headers: {_format_headers(resp_headers)}\n')
if resp_body is not None:
body_str = json.dumps(resp_body, ensure_ascii=False, indent=2) if isinstance(resp_body, dict) else str(resp_body)
# 截断过长响应
if len(body_str) > 2000:
body_str = body_str[:2000] + f'... (truncated, {len(body_str)} chars total)'
_log_fh.write(f' <<< body: {body_str}\n')
_log_fh.flush()
def section(title):
line = f'\n{"="*90}\n {title}\n{"="*90}'
_write_log(line)
def django_eval(code):
"""在独立进程中执行 Django 代码并返回 stdout。"""
preamble = (
'import django, os; '
'os.environ.setdefault("DJANGO_SETTINGS_MODULE","config.settings"); '
'django.setup(); '
)
proc = subprocess.run(
[PYTHON, '-c', preamble + code],
capture_output=True, text=True, cwd=CWD,
)
return proc.stdout.strip()
def get_sms_code(phone, scene):
"""从 Redis 读取短信验证码。"""
val = django_eval(
f'from django.core.cache import cache; print(cache.get("sms:{scene}:{phone}"))'
)
return val if val and val != 'None' else None
def count_case_exam_items(case_id):
"""查询 medical_platform.case_exam_item 中该病例的检查项条数。"""
val = django_eval(
f'from apps.case.models import CaseExamItem; '
f'print(CaseExamItem.objects.filter(case_id={case_id}).count())'
)
return int(val) if val and val.isdigit() else 0
def list_case_exam_item_codes(case_id):
"""返回该病例在库中的 item_code 列表(逗号分隔)。"""
val = django_eval(
f'from apps.case.models import CaseExamItem; '
f'codes = list(CaseExamItem.objects.filter(case_id={case_id}).order_by("display_order", "id").values_list("item_code", flat=True)); '
f'print(",".join(codes))'
)
return val or ''
2026-05-29 15:58:00 +08:00
def inject_sms_code(phone, scene, code='123456'):
"""手动注入短信验证码到 Redis。"""
django_eval(
f'from django.core.cache import cache; '
f'cache.set("sms:{scene}:{phone}", "{code}", 300)'
)
return code
# ─── 清理 Redis 残留数据(限流计数等)────────────────────────────────────────
print('\n[准备] 清理 Redis 缓存...')
django_eval('from django.core.cache import cache; cache.clear(); print("OK")')
# 删除上次可能残留的测试用户
2026-06-05 15:36:31 +08:00
SUPER_PHONE = '13700000090' # 超级管理员,执行 U2 代注册
SUPER_PWD = 'Super123'
PHONE = '13700000099' # CMS 用户(doctor),走 U3 密码登录
STUDENT_PHONE_U4 = '13700000097' # 已录入学生,走 U4 验证码登录(非试用机构需机构匹配)
PHONE_ALT = '13700000098' # U4 试用机构自动注册专用
INST_CODE = 'SWAG_TEST_HOSP'
INST_NAME = 'Swagger测试医院'
2026-06-05 15:36:31 +08:00
# 预留试用机构(与 apps.user.auth 常量保持一致)
TRIAL_INST_CODE = 'PKU_LAB_TRIAL'
TRIAL_INST_NAME = '北大医学部(实验室)试用'
CMS_ROLE = 'doctor' # PHONE 用户的 CMS 角色
# 全库唯一科室名,避免 resolve_department("儿科") 命中多条 → CASE_DEPARTMENT_AMBIGUOUS
DEPT_NAME = 'Swagger儿科'
# C3 手工兜底时的检查项(C1 有解析结果时优先用 AI 的 exam_items
SAMPLE_EXAM_ITEMS = [
{
'item_code': 'blood_routine',
'item_name': '血常规',
'item_type': 'lab',
'category': '实验室检查',
'result_text': 'WBC 10×10^9/L',
'result_structured': {'wbc': '10×10^9/L'},
'is_key': True,
'is_abnormal': False,
'score_weight': 1.0,
'display_order': 1,
},
{
'item_code': 'crp',
'item_name': 'CRP',
'item_type': 'lab',
'category': '实验室检查',
'result_text': 'CRP 8 mg/L',
'is_key': False,
'is_abnormal': False,
'score_weight': 1.0,
'display_order': 2,
},
]
2026-05-29 15:58:00 +08:00
django_eval(
f'from apps.user.models import User; '
2026-06-05 15:36:31 +08:00
f'User.objects.filter(phone__in=["{SUPER_PHONE}","{PHONE}","{STUDENT_PHONE_U4}","{PHONE_ALT}"]).delete(); print("cleaned")'
)
# 预置超级管理员(U2 代注册需管理员身份)
django_eval(
f'from apps.user.models import User; '
f'User.objects.create_user(username="{SUPER_PHONE}", password="{SUPER_PWD}", '
f' phone="{SUPER_PHONE}", real_name="Swagger超管", role_type="super_admin", status=1); '
f'print("super ok")'
)
# 预置试用机构(仅增数据,不改表)
django_eval(
f'from apps.user.models import Institution; '
f'Institution.objects.get_or_create(code="{TRIAL_INST_CODE}", '
f' defaults={{"name":"{TRIAL_INST_NAME}","type":"hospital"}}); print("trial ok")'
2026-05-29 15:58:00 +08:00
)
print('[准备] 完成\n')
s = requests.Session()
# U2 管理员代注册默认密码:Pass + 手机号
PASSWORD = f'Pass{PHONE}'
def _institution_fields():
return {'institution_code': INST_CODE, 'institution_name': INST_NAME}
2026-05-29 15:58:00 +08:00
2026-06-05 15:36:31 +08:00
def get_user_access_token(phone):
"""直接为指定用户签发 access token(用于非 CMS 角色,如 teacher 无法走 U3)。"""
return django_eval(
f'from apps.user.models import User; '
f'from rest_framework_simplejwt.tokens import RefreshToken; '
f'u=User.objects.get(phone="{phone}"); '
f'print(str(RefreshToken.for_user(u).access_token))'
)
2026-05-29 15:58:00 +08:00
# ═══════════════════════════════════════════════════════════════════════════════
section('用户端接口 (U1-U10)')
# ═══════════════════════════════════════════════════════════════════════════════
access = ''
refresh = ''
auth = {}
2026-06-05 15:36:31 +08:00
# INST-LIST: 移动端机构列表(不分页,登录前可调用)
r = s.get(f'{BASE}/api/user/institution_list/')
inst_list_detail = ''
if r.status_code == 200:
items = r.json()
trial_flags = [i for i in items if i.get('is_trial')]
inst_list_detail = f'count={len(items)}, trial={len(trial_flags)}'
log('INST-LIST', 'GET', '/api/user/institution_list/', 200, r.status_code, inst_list_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers),
resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
# U1: 发送验证码(login 场景)
u1_body = {'phone': PHONE, 'scene': 'login'}
2026-05-29 15:58:00 +08:00
r = s.post(f'{BASE}/api/user/auth/send-code/', json=u1_body)
log('U1', 'POST', '/api/user/auth/send-code/', 200, r.status_code,
req_headers=r.request.headers, req_body=u1_body,
resp_headers=dict(r.headers), resp_body=r.json())
2026-06-05 15:36:31 +08:00
# U2: 管理员代注册(仅超管/医院管理员)。超管创建 CMS 角色 doctor,并自动创建机构
super_access = get_user_access_token(SUPER_PHONE)
super_auth = {'Authorization': f'Bearer {super_access}'}
u2_body = {
'phone': PHONE,
'real_name': 'Swagger测试',
2026-06-05 15:36:31 +08:00
'role_type': CMS_ROLE,
**_institution_fields(),
}
2026-06-05 15:36:31 +08:00
r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body, headers=super_auth)
u2_detail = 'has_tokens=%s' % ('tokens' in (r.json() if r.headers.get('content-type','').startswith('application/json') else {}))
log('U2', 'POST', '/api/user/auth/register/ (super_admin)', 201, r.status_code, u2_detail,
2026-05-29 15:58:00 +08:00
req_headers=r.request.headers, req_body=u2_body,
resp_headers=dict(r.headers), resp_body=r.json())
2026-06-05 15:36:31 +08:00
# U2-tok: 代注册不返回 tokens(按管理员代注册语义)
u2_has_tokens = 'tokens' in (r.json() if r.headers.get('content-type','').startswith('application/json') else {})
log('U2-tok', 'CHECK', 'register response has no tokens', False, u2_has_tokens)
# U2-neg1: 未登录代注册 → 401
r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body)
log('U2-neg1', 'POST', '/api/user/auth/register/ (anon)', 401, r.status_code,
f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "",
req_body={'phone': PHONE, '...': '...'}, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
# U3: CMS 密码登录(账号 + 密码 + 角色,三者必填)
u3_body = {'account': PHONE, 'password': PASSWORD, 'role': CMS_ROLE}
2026-05-29 15:58:00 +08:00
r = s.post(f'{BASE}/api/user/auth/login/', json=u3_body)
log('U3', 'POST', '/api/user/auth/login/', 200, r.status_code,
req_headers=r.request.headers, req_body=u3_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
2026-05-29 15:58:00 +08:00
2026-06-05 15:36:31 +08:00
# U3-neg: 缺少角色 → 400 AUTH_BAD_CREDENTIALS
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': PASSWORD})
log('U3-neg', 'POST', '/api/user/auth/login/ (no role)', 400, r.status_code,
f'code={r.json().get("code","")}', req_body={'account': PHONE, 'password': '***'},
resp_body=r.json())
# U2-neg2: 非管理员(doctor)代注册 → 403 USER_NO_REGISTER_PERMISSION
r = s.post(f'{BASE}/api/user/auth/register/',
json={'phone': '13700000095', 'real_name': '越权注册', 'role_type': 'student',
**_institution_fields()}, headers=auth)
log('U2-neg2', 'POST', '/api/user/auth/register/ (doctor)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "",
resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
# U2-ha / U2-ha-neg: 医院管理员仅能在本机构内代注册
HA_PHONE = '13700000094'
HA_STUDENT = '13700000093'
django_eval(
f'from apps.user.models import User, Institution; '
f'inst=Institution.objects.get(code="{INST_CODE}"); '
f'User.objects.filter(phone__in=["{HA_PHONE}","{HA_STUDENT}"]).delete(); '
f'User.objects.create_user(username="{HA_PHONE}", password="Hosp1234", phone="{HA_PHONE}", '
f' real_name="Swagger医院管理员", role_type="hospital_admin", institution=inst, status=1); '
f'print("hadmin ok")'
)
ha_auth = {'Authorization': f'Bearer {get_user_access_token(HA_PHONE)}'}
# U2-ha: 本机构建学生 → 201
ha_body = {'phone': HA_STUDENT, 'real_name': '本院学生', 'role_type': 'student', **_institution_fields()}
r = s.post(f'{BASE}/api/user/auth/register/', json=ha_body, headers=ha_auth)
log('U2-ha', 'POST', '/api/user/auth/register/ (hospital_admin own inst)', 201, r.status_code,
req_body=ha_body, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
# U2-ha-neg: 跨机构建账号 → 403 USER_INSTITUTION_SCOPE_FORBIDDEN
ha_neg_body = {'phone': '13700000092', 'real_name': '跨院学生', 'role_type': 'student',
'institution_code': 'SWAG_OTHER_HOSP', 'institution_name': 'Swagger其它医院'}
r = s.post(f'{BASE}/api/user/auth/register/', json=ha_neg_body, headers=ha_auth)
log('U2-ha-neg', 'POST', '/api/user/auth/register/ (hospital_admin cross inst)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "",
req_body=ha_neg_body, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
django_eval(f'from apps.user.models import User; User.objects.filter(phone__in=["{HA_PHONE}","{HA_STUDENT}"]).delete()')
# U4: 验证码登录(移动端,已录入学生 + 机构匹配 → 200)
# 预创建学生,institution 与所选机构一致
django_eval(
f'from apps.user.models import User, Institution; '
f'inst=Institution.objects.get(code="{INST_CODE}"); '
f'User.objects.filter(phone="{STUDENT_PHONE_U4}").delete(); '
f'User.objects.create_user(username="{STUDENT_PHONE_U4}", password=None, '
f' phone="{STUDENT_PHONE_U4}", real_name="Swagger学生U4", role_type="student", '
f' institution=inst, status=1); print("student ok")'
)
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': STUDENT_PHONE_U4, 'scene': 'login'})
login_code = get_sms_code(STUDENT_PHONE_U4, 'login') or inject_sms_code(STUDENT_PHONE_U4, 'login')
u4_body = {'phone': STUDENT_PHONE_U4, 'code': login_code, 'institution_code': INST_CODE}
2026-05-29 15:58:00 +08:00
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_body)
2026-06-05 15:36:31 +08:00
u4_detail = f'is_new_user={r.json().get("is_new_user")}' if r.status_code in (200, 201) else f'code={r.json().get("code","")}'
log('U4', 'POST', '/api/user/auth/login-code/ (registered student)', 200, r.status_code, u4_detail,
2026-05-29 15:58:00 +08:00
req_headers=r.request.headers, req_body=u4_body,
resp_headers=dict(r.headers), resp_body=r.json())
2026-06-05 15:36:31 +08:00
# U4-neg: 非试用机构 + 未录入手机号 → 403 AUTH_NOT_REGISTERED
unreg_phone = '13700000096'
django_eval(f'from apps.user.models import User; User.objects.filter(phone="{unreg_phone}").delete()')
inject_sms_code(unreg_phone, 'login')
u4_neg_body = {'phone': unreg_phone, 'code': '123456', 'institution_code': INST_CODE}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_neg_body)
log('U4-neg', 'POST', '/api/user/auth/login-code/ (unregistered, non-trial)', 403, r.status_code,
f'code={r.json().get("code","")}', req_body=u4_neg_body, resp_body=r.json())
# U4-new: 试用机构验证码登录自动注册(新手机号 → 201 is_new_user=true
django_eval(f'from apps.user.models import User; User.objects.filter(phone="{PHONE_ALT}").delete()')
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE_ALT, 'scene': 'login'})
log('U4-pre', 'POST', '/api/user/auth/send-code/ (alt)', 200, r.status_code,
req_body={'phone': PHONE_ALT, 'scene': 'login'})
alt_code = get_sms_code(PHONE_ALT, 'login') or inject_sms_code(PHONE_ALT, 'login')
2026-06-05 15:36:31 +08:00
u4_new_body = {'phone': PHONE_ALT, 'code': alt_code, 'institution_code': TRIAL_INST_CODE}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_new_body)
u4_new_detail = ''
if r.status_code in (200, 201):
u4_new_detail = f'is_new_user={r.json().get("is_new_user")}'
2026-06-05 15:36:31 +08:00
log('U4-new', 'POST', '/api/user/auth/login-code/ (trial auto-register)', 201, r.status_code,
u4_new_detail, req_body=u4_new_body, resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
2026-06-05 15:36:31 +08:00
django_eval(f'from apps.user.models import User; User.objects.filter(phone__in=["{PHONE_ALT}","{STUDENT_PHONE_U4}","{unreg_phone}"]).delete()')
2026-05-29 15:58:00 +08:00
# U5: 重置密码
NEW_PASSWORD = 'SwagNew1'
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE, 'scene': 'reset'})
reset_code = get_sms_code(PHONE, 'reset')
if not reset_code:
reset_code = inject_sms_code(PHONE, 'reset', '111111')
u5_body = {'phone': PHONE, 'code': reset_code, 'new_password': NEW_PASSWORD}
r = s.post(f'{BASE}/api/user/auth/reset-password/', json=u5_body)
log('U5', 'POST', '/api/user/auth/reset-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u5_body,
resp_headers=dict(r.headers), resp_body=r.json())
# reset-password 调用 invalidate_user_tokens(time()+1),必须等 1s 再登录
time.sleep(1.2)
2026-06-05 15:36:31 +08:00
# 用新密码重新登录(CMS
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': NEW_PASSWORD, 'role': CMS_ROLE})
2026-05-29 15:58:00 +08:00
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U6: 修改密码
FINAL_PASSWORD = 'SwagFin1'
u6_body = {'old_password': NEW_PASSWORD, 'new_password': FINAL_PASSWORD}
r = s.post(f'{BASE}/api/user/users/change-password/', json=u6_body, headers=auth)
log('U6', 'POST', '/api/user/users/change-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u6_body,
resp_headers=dict(r.headers), resp_body=r.json())
# change-password 同样 invalidate_user_tokens(time()+1)
time.sleep(1.2)
2026-06-05 15:36:31 +08:00
# 用最终密码重新登录(CMS
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': FINAL_PASSWORD, 'role': CMS_ROLE})
2026-05-29 15:58:00 +08:00
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U8: 刷新 Token
u8_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/refresh/', json=u8_body)
log('U8', 'POST', '/api/user/auth/refresh/', 200, r.status_code,
req_headers=r.request.headers, req_body=u8_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
access = r.json().get('access', access)
auth = {'Authorization': f'Bearer {access}'}
# /me: GET /me
r = s.get(f'{BASE}/api/user/users/me/', headers=auth)
log('/me', 'GET', '/api/user/users/me/', 200, r.status_code,
f'phone={r.json().get("phone","")}' if r.status_code == 200 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
test_user_id = r.json().get('id') if r.status_code == 200 else None
# ── U9/U10: 用户列表 + 用户详情 ──────────────────────────────────────────────
# 创建 admin、teacher、student 用户 + 师生关系
ADMIN_PHONE = '13700000088'
TEACHER_PHONE = '13700000077'
STUDENT_PHONE = '13700000066'
ROLE_PWD = 'RoleTest1'
django_eval(
f'from apps.user.models import User, TeacherStudentRelation; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'admin = User.objects.create_user(username="{ADMIN_PHONE}", password="{ROLE_PWD}", '
f' phone="{ADMIN_PHONE}", real_name="Swagger管理员", role_type="super_admin", status=1); '
f'teacher = User.objects.create_user(username="{TEACHER_PHONE}", password="{ROLE_PWD}", '
2026-06-13 13:21:53 +08:00
f' phone="{TEACHER_PHONE}", real_name="Swagger带教医生", role_type="doctor", status=1); '
2026-05-29 15:58:00 +08:00
f'student = User.objects.create_user(username="{STUDENT_PHONE}", password="{ROLE_PWD}", '
f' phone="{STUDENT_PHONE}", real_name="Swagger学生", role_type="student", status=1); '
f'TeacherStudentRelation.objects.create(teacher=teacher, student=student, '
f' relation_type="指导", status=1); '
f'print(f"admin={{admin.id}} teacher={{teacher.id}} student={{student.id}}")'
)
2026-06-05 15:36:31 +08:00
# 管理员登录(CMSsuper_admin
r = s.post(f'{BASE}/api/user/auth/login/',
json={'account': ADMIN_PHONE, 'password': ROLE_PWD, 'role': 'super_admin'})
2026-05-29 15:58:00 +08:00
admin_tokens = r.json().get('tokens', {})
admin_auth = {'Authorization': f'Bearer {admin_tokens.get("access", "")}'}
admin_refresh = admin_tokens.get('refresh', '')
# U9: 管理员获取用户列表
r = s.get(f'{BASE}/api/user/users/', headers=admin_auth)
u9_detail = ''
if r.status_code == 200:
u9_data = r.json()
u9_results = u9_data.get('results', u9_data)
u9_detail = f'count={len(u9_results)}'
log('U9', 'GET', '/api/user/users/', 200, r.status_code, u9_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-b: 教师获取用户列表(仅名下学生)
2026-06-05 15:36:31 +08:00
# teacher 角色不在 CMS 登录角色内(U3 仅 CMS 角色),直接签发 token 用于权限演示
teacher_access = get_user_access_token(TEACHER_PHONE)
teacher_auth = {'Authorization': f'Bearer {teacher_access}'}
2026-05-29 15:58:00 +08:00
r = s.get(f'{BASE}/api/user/users/', headers=teacher_auth)
u9b_detail = ''
if r.status_code == 200:
u9b_data = r.json()
u9b_results = u9b_data.get('results', u9b_data)
u9b_detail = f'count={len(u9b_results)}(should=1 student only)'
log('U9-b', 'GET', '/api/user/users/ (teacher)', 200, r.status_code, u9b_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-c: 普通用户(当前测试用户)获取列表 → 403
r = s.get(f'{BASE}/api/user/users/', headers=auth)
log('U9-c', 'GET', '/api/user/users/ (normal)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.status_code == 403 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U10: 管理员查看学生详情
# 先获取 student id
student_id = django_eval(
f'from apps.user.models import User; '
f'u = User.objects.get(phone="{STUDENT_PHONE}"); print(u.id)'
)
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=admin_auth)
log('U10', 'GET', f'/api/user/users/{student_id}/', 200, r.status_code,
f'real_name={r.json().get("real_name","")}' if r.status_code == 200 else '',
resp_body=r.json())
# U10-b: 教师查看名下学生详情
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=teacher_auth)
log('U10-b', 'GET', f'/api/user/users/{student_id}/ (teacher)', 200, r.status_code,
resp_body=r.json())
# U10-c: 普通用户查看自己详情
if test_user_id:
r = s.get(f'{BASE}/api/user/users/{test_user_id}/', headers=auth)
log('U10-c', 'GET', f'/api/user/users/{test_user_id}/ (self)', 200, r.status_code,
resp_body=r.json())
# 清理辅助用户
django_eval(
f'from apps.user.models import User; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'print("cleaned")'
)
# U7: 退出登录 (logout) — 放最后,因为会吊销 refresh
u7_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/logout/', json=u7_body, headers=auth)
log('U7', 'POST', '/api/user/auth/logout/', 200, r.status_code,
req_body=u7_body, resp_body=r.json())
# ═══════════════════════════════════════════════════════════════════════════════
section('病例端接口 (C1-C5)')
# ═══════════════════════════════════════════════════════════════════════════════
# 重新登录(logout 吊销了上一个 refresh
time.sleep(1.2)
2026-06-05 15:36:31 +08:00
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': FINAL_PASSWORD, 'role': CMS_ROLE})
2026-05-29 15:58:00 +08:00
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
auth = {'Authorization': f'Bearer {access}'}
# 确保机构与科室存在(与 U2/U4 同一 institution_code
2026-05-29 15:58:00 +08:00
django_eval(
f'from apps.user.models import Institution, Department; '
f'inst, _ = Institution.objects.get_or_create(code="{INST_CODE}", '
f' defaults={{"name":"{INST_NAME}","type":"hospital","province":"北京","city":"北京"}}); '
f'Department.objects.get_or_create(name="{DEPT_NAME}", '
f' defaults={{"category":"临床"}}); '
f'print("OK")'
2026-05-29 15:58:00 +08:00
)
# C1: PDF 解析 — 使用项目真实 PDF 文件
REAL_PDF = r'D:\01Agent\medical_training\儿科 病例样例(SOAP+循证).pdf'
with open(REAL_PDF, 'rb') as f:
r = s.post(
f'{BASE}/api/case/cases/parse-pdf/',
files={'files': ('儿科 病例样例(SOAP+循证).pdf', f, 'application/pdf')},
data={'case_type': 'traditional'},
headers=auth,
)
c1_ok = [200, 500, 429] # 200=AI 解析成功, 500=AI 异常, 429=限流
detail = ''
c1_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
body = r.json()
c1_resp = body
detail = f'code={body.get("code", "")}'
if r.status_code == 200:
detail = f'parse_id={body.get("parse_id","")}, keys={list(body.get("data",{}).keys())[:5]}'
log('C1', 'POST', '/api/case/cases/parse-pdf/', c1_ok, r.status_code, detail,
req_body={'case_type': 'traditional', 'files': '<PDF file>'}, resp_body=c1_resp)
# C2: 生成评分规则 — 如果 C1 成功则用其返回的 data,否则用手工载荷
c1_data = None
if r.status_code == 200:
c1_data = body.get('data', {})
if c1_data:
c1_exam = c1_data.get('exam_items') or []
_write_log(
f' [INFO] C1 解析 exam_items: count={len(c1_exam)}, '
f'codes={[x.get("item_code") for x in c1_exam if isinstance(x, dict)]}'
)
2026-05-29 15:58:00 +08:00
c2_payload = c1_data if c1_data else {
'title': 'Swagger测试病例',
'case_type': 'traditional',
'chief_complaint': '发热3天',
'description': '患儿男4岁发热3天就诊',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗',
},
}
r = s.post(f'{BASE}/api/case/cases/generate-scoring-rules/', json=c2_payload, headers=auth)
c2_ok = [200, 500, 429]
detail = ''
scoring_rules_from_ai = None
c2_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
c2_body = r.json()
c2_resp = c2_body
if r.status_code == 200:
scoring_rules_from_ai = c2_body.get('scoring_rules', [])
detail = f'generated={c2_body.get("generated","")}, rules={len(scoring_rules_from_ai)}'
else:
detail = f'code={c2_body.get("code", "")}'
log('C2', 'POST', '/api/case/cases/generate-scoring-rules/', c2_ok, r.status_code, detail,
req_body=c2_payload, resp_body=c2_resp)
# C3: full-create — 优先用 C1+C2 AI 结果,否则用手工载荷
if c1_data and scoring_rules_from_ai:
_write_log(' [INFO] C3 使用 C1 AI 解析 + C2 AI 评分规则组装载荷')
payload = {**c1_data}
payload['department_name'] = DEPT_NAME # C1 常返回「儿科」,库内重名会 400
2026-05-29 15:58:00 +08:00
payload['scoring_rules'] = scoring_rules_from_ai
else:
_write_log(' [INFO] C3 使用手工表单载荷(C1/C2 未全部成功)')
payload = {
'title': 'Swagger-Try-It-Out-病例',
'case_type': 'traditional',
'difficulty': 'medium',
'chief_complaint': '发热 3 天',
'description': '患儿,男,4 岁,因发热 3 天就诊。',
'patient_age': 4,
'patient_gender': 'male',
'department_name': DEPT_NAME,
2026-05-29 15:58:00 +08:00
'estimated_minutes': 30,
'osce_enabled': False,
'tags': '儿科,发热',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗,退热处理',
'guideline_reference': '《儿科学》第 9 版',
},
'scoring_rules': [
{
'dimension': '诊断准确性',
'score_weight': 0.6,
'ai_auto_score': True,
'scoring_standard': '准确判断上呼吸道感染',
},
{
'dimension': '治疗方案',
'score_weight': 0.4,
'ai_auto_score': True,
'scoring_standard': '治疗方案合理',
},
],
'exam_items': SAMPLE_EXAM_ITEMS,
2026-05-29 15:58:00 +08:00
}
payload_exam_items = payload.get('exam_items') or []
_write_log(
f' [INFO] C3 提交 exam_items: count={len(payload_exam_items)}, '
f'codes={[x.get("item_code") for x in payload_exam_items if isinstance(x, dict)]}'
)
2026-05-29 15:58:00 +08:00
r = s.post(f'{BASE}/api/case/cases/full-create/', json=payload, headers=auth)
c3_resp = r.json() if r.headers.get('content-type', '').startswith('application/json') else None
log('C3', 'POST', '/api/case/cases/full-create/', 201, r.status_code,
req_body=payload, resp_body=c3_resp)
case_id = None
expected_exam_count = len(payload_exam_items)
2026-05-29 15:58:00 +08:00
if r.status_code == 201:
case_id = r.json()['case']['id']
resp_exam = (c3_resp or {}).get('exam_items', [])
resp_exam_count = len(resp_exam)
log(
'C3-exam', 'CHECK', f'/api/case/cases/{case_id}/ (resp exam_items)',
expected_exam_count, resp_exam_count,
f'codes={[e.get("item_code") for e in resp_exam]}',
resp_body={'exam_items': resp_exam} if resp_exam else None,
)
db_exam_count = count_case_exam_items(case_id)
db_codes = list_case_exam_item_codes(case_id)
log(
'C3-db', 'CHECK', f'case_exam_item case_id={case_id}',
expected_exam_count, db_exam_count,
f'db_codes={db_codes}',
)
else:
_write_log(' SKIP C3-exam/C3-db — C3 未成功')
2026-05-29 15:58:00 +08:00
# C4: GET full
if case_id:
r = s.get(f'{BASE}/api/case/cases/{case_id}/full/', headers=auth)
c4_detail = ''
c4_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
c4_resp = r.json()
c4_exam = c4_resp.get('exam_items', [])
c4_detail = f'exam_items={len(c4_exam)}(expect={expected_exam_count})'
2026-05-29 15:58:00 +08:00
log('C4', 'GET', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
c4_detail, resp_body=c4_resp)
if c4_resp is not None:
log(
'C4-exam', 'CHECK', f'/api/case/cases/{case_id}/full/ exam_items',
expected_exam_count, len(c4_resp.get('exam_items', [])),
f'codes={[e.get("item_code") for e in c4_resp.get("exam_items", [])]}',
)
2026-05-29 15:58:00 +08:00
else:
_write_log(' SKIP C4 — C3 未返回 case_id')
# C5: PATCH full
if case_id:
c5_body = {'title': 'Swagger-更新标题'}
r = s.patch(f'{BASE}/api/case/cases/{case_id}/full/', json=c5_body, headers=auth)
log('C5', 'PATCH', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
req_body=c5_body,
resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
else:
_write_log(' SKIP C5 — C3 未返回 case_id')
# ═══════════════════════════════════════════════════════════════════════════════
section('汇总')
# ═══════════════════════════════════════════════════════════════════════════════
total = len(results)
passed = sum(1 for r in results if r[5] == PASS)
failed = sum(1 for r in results if r[5] == FAIL)
_write_log(f'\n 总计: {total} 个接口 | 通过: {passed} | 失败: {failed}')
if failed:
_write_log('\n 失败接口:')
for r in results:
if r[5] == FAIL:
_write_log(f' {r[0]} {r[1]} {r[2]} -- expect={r[3]}, got={r[4]}')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(1)
else:
_write_log('\n ALL PASSED - 全部接口 Swagger Try-it-out 验证通过!')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(0)