Files
medical_training/test/swagger_tryout.py
T
2026-06-05 15:36:31 +08:00

716 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Swagger Try-it-out 等效脚本:逐个调用所有接口,验证可达性和基本功能。
运行方式:.venv\\Scripts\\python.exe test/swagger_tryout.py
前提:Django dev server 已在 http://127.0.0.1:8000 运行,Redis 已启动;.env 指向目标库。
认证流程(与当前 API 一致):
U1 发码(login) → U2 管理员代注册(机构编码+名称, 默认密码 Pass+手机号)
→ U3 密码登录 → U4 验证码登录(机构信息) → U4-new 新号自动注册
→ U5 重置密码 → U6 改密 → U8 刷新 → /me → U9/U10 → U7 退出
病例段:C1 解析 exam_items → C3 full-create 写入 case_exam_item(校验响应条数与库表条数)
日志输出:logs/test-swagger-YYYY-MM-DD.log(含完整请求体和响应体)
"""
import io
import json
import sys
import time
import subprocess
from datetime import datetime
from pathlib import Path
import requests
# 修复 Windows GBK 编码问题
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
BASE = 'http://127.0.0.1:8000'
PYTHON = r'D:\01Agent\medical_training\.venv\Scripts\python.exe'
CWD = r'D:\01Agent\medical_training'
PASS = 'PASS'
FAIL = 'FAIL'
results = []
# ─── 日志文件 ─────────────────────────────────────────────────────────────────
LOGS_DIR = Path(CWD) / 'logs'
LOGS_DIR.mkdir(exist_ok=True)
LOG_FILE = LOGS_DIR / f'test-swagger-{datetime.now().strftime("%Y-%m-%d")}.log'
_log_fh = open(LOG_FILE, 'a', encoding='utf-8')
def _write_log(text):
"""同时写入日志文件和控制台。"""
_log_fh.write(text + '\n')
_log_fh.flush()
print(text)
def _format_headers(headers):
"""将 requests 的 headers 格式化为 JSON 字符串。"""
if not headers:
return ''
return json.dumps(dict(headers), ensure_ascii=False)
def log(api_id, method, url, expected, actual, detail='',
req_headers=None, req_body=None, resp_headers=None, resp_body=None):
status = PASS if actual in (expected if isinstance(expected, (list, tuple)) else [expected]) else FAIL
results.append((api_id, method, url, expected, actual, status))
exp_str = str(expected)
_write_log(f' {status} {api_id:<6} {method:<6} {url:<50} expect={exp_str:<20} got={actual} {detail}')
# 详细请求/响应写入日志文件(不打印到控制台,避免过长)
if req_headers is not None:
_log_fh.write(f' >>> headers: {_format_headers(req_headers)}\n')
if req_body is not None:
body_str = json.dumps(req_body, ensure_ascii=False, indent=2) if isinstance(req_body, dict) else str(req_body)
_log_fh.write(f' >>> body: {body_str}\n')
if resp_headers is not None:
_log_fh.write(f' <<< headers: {_format_headers(resp_headers)}\n')
if resp_body is not None:
body_str = json.dumps(resp_body, ensure_ascii=False, indent=2) if isinstance(resp_body, dict) else str(resp_body)
# 截断过长响应
if len(body_str) > 2000:
body_str = body_str[:2000] + f'... (truncated, {len(body_str)} chars total)'
_log_fh.write(f' <<< body: {body_str}\n')
_log_fh.flush()
def section(title):
line = f'\n{"="*90}\n {title}\n{"="*90}'
_write_log(line)
def django_eval(code):
"""在独立进程中执行 Django 代码并返回 stdout。"""
preamble = (
'import django, os; '
'os.environ.setdefault("DJANGO_SETTINGS_MODULE","config.settings"); '
'django.setup(); '
)
proc = subprocess.run(
[PYTHON, '-c', preamble + code],
capture_output=True, text=True, cwd=CWD,
)
return proc.stdout.strip()
def get_sms_code(phone, scene):
"""从 Redis 读取短信验证码。"""
val = django_eval(
f'from django.core.cache import cache; print(cache.get("sms:{scene}:{phone}"))'
)
return val if val and val != 'None' else None
def count_case_exam_items(case_id):
"""查询 medical_platform.case_exam_item 中该病例的检查项条数。"""
val = django_eval(
f'from apps.case.models import CaseExamItem; '
f'print(CaseExamItem.objects.filter(case_id={case_id}).count())'
)
return int(val) if val and val.isdigit() else 0
def list_case_exam_item_codes(case_id):
"""返回该病例在库中的 item_code 列表(逗号分隔)。"""
val = django_eval(
f'from apps.case.models import CaseExamItem; '
f'codes = list(CaseExamItem.objects.filter(case_id={case_id}).order_by("display_order", "id").values_list("item_code", flat=True)); '
f'print(",".join(codes))'
)
return val or ''
def inject_sms_code(phone, scene, code='123456'):
"""手动注入短信验证码到 Redis。"""
django_eval(
f'from django.core.cache import cache; '
f'cache.set("sms:{scene}:{phone}", "{code}", 300)'
)
return code
# ─── 清理 Redis 残留数据(限流计数等)────────────────────────────────────────
print('\n[准备] 清理 Redis 缓存...')
django_eval('from django.core.cache import cache; cache.clear(); print("OK")')
# 删除上次可能残留的测试用户
SUPER_PHONE = '13700000090' # 超级管理员,执行 U2 代注册
SUPER_PWD = 'Super123'
PHONE = '13700000099' # CMS 用户(doctor),走 U3 密码登录
STUDENT_PHONE_U4 = '13700000097' # 已录入学生,走 U4 验证码登录(非试用机构需机构匹配)
PHONE_ALT = '13700000098' # U4 试用机构自动注册专用
INST_CODE = 'SWAG_TEST_HOSP'
INST_NAME = 'Swagger测试医院'
# 预留试用机构(与 apps.user.auth 常量保持一致)
TRIAL_INST_CODE = 'PKU_LAB_TRIAL'
TRIAL_INST_NAME = '北大医学部(实验室)试用'
CMS_ROLE = 'doctor' # PHONE 用户的 CMS 角色
# 全库唯一科室名,避免 resolve_department("儿科") 命中多条 → CASE_DEPARTMENT_AMBIGUOUS
DEPT_NAME = 'Swagger儿科'
# C3 手工兜底时的检查项(C1 有解析结果时优先用 AI 的 exam_items
SAMPLE_EXAM_ITEMS = [
{
'item_code': 'blood_routine',
'item_name': '血常规',
'item_type': 'lab',
'category': '实验室检查',
'result_text': 'WBC 10×10^9/L',
'result_structured': {'wbc': '10×10^9/L'},
'is_key': True,
'is_abnormal': False,
'score_weight': 1.0,
'display_order': 1,
},
{
'item_code': 'crp',
'item_name': 'CRP',
'item_type': 'lab',
'category': '实验室检查',
'result_text': 'CRP 8 mg/L',
'is_key': False,
'is_abnormal': False,
'score_weight': 1.0,
'display_order': 2,
},
]
django_eval(
f'from apps.user.models import User; '
f'User.objects.filter(phone__in=["{SUPER_PHONE}","{PHONE}","{STUDENT_PHONE_U4}","{PHONE_ALT}"]).delete(); print("cleaned")'
)
# 预置超级管理员(U2 代注册需管理员身份)
django_eval(
f'from apps.user.models import User; '
f'User.objects.create_user(username="{SUPER_PHONE}", password="{SUPER_PWD}", '
f' phone="{SUPER_PHONE}", real_name="Swagger超管", role_type="super_admin", status=1); '
f'print("super ok")'
)
# 预置试用机构(仅增数据,不改表)
django_eval(
f'from apps.user.models import Institution; '
f'Institution.objects.get_or_create(code="{TRIAL_INST_CODE}", '
f' defaults={{"name":"{TRIAL_INST_NAME}","type":"hospital"}}); print("trial ok")'
)
print('[准备] 完成\n')
s = requests.Session()
# U2 管理员代注册默认密码:Pass + 手机号
PASSWORD = f'Pass{PHONE}'
def _institution_fields():
return {'institution_code': INST_CODE, 'institution_name': INST_NAME}
def get_user_access_token(phone):
"""直接为指定用户签发 access token(用于非 CMS 角色,如 teacher 无法走 U3)。"""
return django_eval(
f'from apps.user.models import User; '
f'from rest_framework_simplejwt.tokens import RefreshToken; '
f'u=User.objects.get(phone="{phone}"); '
f'print(str(RefreshToken.for_user(u).access_token))'
)
# ═══════════════════════════════════════════════════════════════════════════════
section('用户端接口 (U1-U10)')
# ═══════════════════════════════════════════════════════════════════════════════
access = ''
refresh = ''
auth = {}
# INST-LIST: 移动端机构列表(不分页,登录前可调用)
r = s.get(f'{BASE}/api/user/institution_list/')
inst_list_detail = ''
if r.status_code == 200:
items = r.json()
trial_flags = [i for i in items if i.get('is_trial')]
inst_list_detail = f'count={len(items)}, trial={len(trial_flags)}'
log('INST-LIST', 'GET', '/api/user/institution_list/', 200, r.status_code, inst_list_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers),
resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
# U1: 发送验证码(login 场景)
u1_body = {'phone': PHONE, 'scene': 'login'}
r = s.post(f'{BASE}/api/user/auth/send-code/', json=u1_body)
log('U1', 'POST', '/api/user/auth/send-code/', 200, r.status_code,
req_headers=r.request.headers, req_body=u1_body,
resp_headers=dict(r.headers), resp_body=r.json())
# U2: 管理员代注册(仅超管/医院管理员)。超管创建 CMS 角色 doctor,并自动创建机构
super_access = get_user_access_token(SUPER_PHONE)
super_auth = {'Authorization': f'Bearer {super_access}'}
u2_body = {
'phone': PHONE,
'real_name': 'Swagger测试',
'role_type': CMS_ROLE,
**_institution_fields(),
}
r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body, headers=super_auth)
u2_detail = 'has_tokens=%s' % ('tokens' in (r.json() if r.headers.get('content-type','').startswith('application/json') else {}))
log('U2', 'POST', '/api/user/auth/register/ (super_admin)', 201, r.status_code, u2_detail,
req_headers=r.request.headers, req_body=u2_body,
resp_headers=dict(r.headers), resp_body=r.json())
# U2-tok: 代注册不返回 tokens(按管理员代注册语义)
u2_has_tokens = 'tokens' in (r.json() if r.headers.get('content-type','').startswith('application/json') else {})
log('U2-tok', 'CHECK', 'register response has no tokens', False, u2_has_tokens)
# U2-neg1: 未登录代注册 → 401
r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body)
log('U2-neg1', 'POST', '/api/user/auth/register/ (anon)', 401, r.status_code,
f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "",
req_body={'phone': PHONE, '...': '...'}, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
# U3: CMS 密码登录(账号 + 密码 + 角色,三者必填)
u3_body = {'account': PHONE, 'password': PASSWORD, 'role': CMS_ROLE}
r = s.post(f'{BASE}/api/user/auth/login/', json=u3_body)
log('U3', 'POST', '/api/user/auth/login/', 200, r.status_code,
req_headers=r.request.headers, req_body=u3_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U3-neg: 缺少角色 → 400 AUTH_BAD_CREDENTIALS
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': PASSWORD})
log('U3-neg', 'POST', '/api/user/auth/login/ (no role)', 400, r.status_code,
f'code={r.json().get("code","")}', req_body={'account': PHONE, 'password': '***'},
resp_body=r.json())
# U2-neg2: 非管理员(doctor)代注册 → 403 USER_NO_REGISTER_PERMISSION
r = s.post(f'{BASE}/api/user/auth/register/',
json={'phone': '13700000095', 'real_name': '越权注册', 'role_type': 'student',
**_institution_fields()}, headers=auth)
log('U2-neg2', 'POST', '/api/user/auth/register/ (doctor)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "",
resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
# U2-ha / U2-ha-neg: 医院管理员仅能在本机构内代注册
HA_PHONE = '13700000094'
HA_STUDENT = '13700000093'
django_eval(
f'from apps.user.models import User, Institution; '
f'inst=Institution.objects.get(code="{INST_CODE}"); '
f'User.objects.filter(phone__in=["{HA_PHONE}","{HA_STUDENT}"]).delete(); '
f'User.objects.create_user(username="{HA_PHONE}", password="Hosp1234", phone="{HA_PHONE}", '
f' real_name="Swagger医院管理员", role_type="hospital_admin", institution=inst, status=1); '
f'print("hadmin ok")'
)
ha_auth = {'Authorization': f'Bearer {get_user_access_token(HA_PHONE)}'}
# U2-ha: 本机构建学生 → 201
ha_body = {'phone': HA_STUDENT, 'real_name': '本院学生', 'role_type': 'student', **_institution_fields()}
r = s.post(f'{BASE}/api/user/auth/register/', json=ha_body, headers=ha_auth)
log('U2-ha', 'POST', '/api/user/auth/register/ (hospital_admin own inst)', 201, r.status_code,
req_body=ha_body, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
# U2-ha-neg: 跨机构建账号 → 403 USER_INSTITUTION_SCOPE_FORBIDDEN
ha_neg_body = {'phone': '13700000092', 'real_name': '跨院学生', 'role_type': 'student',
'institution_code': 'SWAG_OTHER_HOSP', 'institution_name': 'Swagger其它医院'}
r = s.post(f'{BASE}/api/user/auth/register/', json=ha_neg_body, headers=ha_auth)
log('U2-ha-neg', 'POST', '/api/user/auth/register/ (hospital_admin cross inst)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "",
req_body=ha_neg_body, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None)
django_eval(f'from apps.user.models import User; User.objects.filter(phone__in=["{HA_PHONE}","{HA_STUDENT}"]).delete()')
# U4: 验证码登录(移动端,已录入学生 + 机构匹配 → 200)
# 预创建学生,institution 与所选机构一致
django_eval(
f'from apps.user.models import User, Institution; '
f'inst=Institution.objects.get(code="{INST_CODE}"); '
f'User.objects.filter(phone="{STUDENT_PHONE_U4}").delete(); '
f'User.objects.create_user(username="{STUDENT_PHONE_U4}", password=None, '
f' phone="{STUDENT_PHONE_U4}", real_name="Swagger学生U4", role_type="student", '
f' institution=inst, status=1); print("student ok")'
)
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': STUDENT_PHONE_U4, 'scene': 'login'})
login_code = get_sms_code(STUDENT_PHONE_U4, 'login') or inject_sms_code(STUDENT_PHONE_U4, 'login')
u4_body = {'phone': STUDENT_PHONE_U4, 'code': login_code, 'institution_code': INST_CODE}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_body)
u4_detail = f'is_new_user={r.json().get("is_new_user")}' if r.status_code in (200, 201) else f'code={r.json().get("code","")}'
log('U4', 'POST', '/api/user/auth/login-code/ (registered student)', 200, r.status_code, u4_detail,
req_headers=r.request.headers, req_body=u4_body,
resp_headers=dict(r.headers), resp_body=r.json())
# U4-neg: 非试用机构 + 未录入手机号 → 403 AUTH_NOT_REGISTERED
unreg_phone = '13700000096'
django_eval(f'from apps.user.models import User; User.objects.filter(phone="{unreg_phone}").delete()')
inject_sms_code(unreg_phone, 'login')
u4_neg_body = {'phone': unreg_phone, 'code': '123456', 'institution_code': INST_CODE}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_neg_body)
log('U4-neg', 'POST', '/api/user/auth/login-code/ (unregistered, non-trial)', 403, r.status_code,
f'code={r.json().get("code","")}', req_body=u4_neg_body, resp_body=r.json())
# U4-new: 试用机构验证码登录自动注册(新手机号 → 201 is_new_user=true
django_eval(f'from apps.user.models import User; User.objects.filter(phone="{PHONE_ALT}").delete()')
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE_ALT, 'scene': 'login'})
log('U4-pre', 'POST', '/api/user/auth/send-code/ (alt)', 200, r.status_code,
req_body={'phone': PHONE_ALT, 'scene': 'login'})
alt_code = get_sms_code(PHONE_ALT, 'login') or inject_sms_code(PHONE_ALT, 'login')
u4_new_body = {'phone': PHONE_ALT, 'code': alt_code, 'institution_code': TRIAL_INST_CODE}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_new_body)
u4_new_detail = ''
if r.status_code in (200, 201):
u4_new_detail = f'is_new_user={r.json().get("is_new_user")}'
log('U4-new', 'POST', '/api/user/auth/login-code/ (trial auto-register)', 201, r.status_code,
u4_new_detail, req_body=u4_new_body, resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
django_eval(f'from apps.user.models import User; User.objects.filter(phone__in=["{PHONE_ALT}","{STUDENT_PHONE_U4}","{unreg_phone}"]).delete()')
# U5: 重置密码
NEW_PASSWORD = 'SwagNew1'
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE, 'scene': 'reset'})
reset_code = get_sms_code(PHONE, 'reset')
if not reset_code:
reset_code = inject_sms_code(PHONE, 'reset', '111111')
u5_body = {'phone': PHONE, 'code': reset_code, 'new_password': NEW_PASSWORD}
r = s.post(f'{BASE}/api/user/auth/reset-password/', json=u5_body)
log('U5', 'POST', '/api/user/auth/reset-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u5_body,
resp_headers=dict(r.headers), resp_body=r.json())
# reset-password 调用 invalidate_user_tokens(time()+1),必须等 1s 再登录
time.sleep(1.2)
# 用新密码重新登录(CMS
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': NEW_PASSWORD, 'role': CMS_ROLE})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U6: 修改密码
FINAL_PASSWORD = 'SwagFin1'
u6_body = {'old_password': NEW_PASSWORD, 'new_password': FINAL_PASSWORD}
r = s.post(f'{BASE}/api/user/users/change-password/', json=u6_body, headers=auth)
log('U6', 'POST', '/api/user/users/change-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u6_body,
resp_headers=dict(r.headers), resp_body=r.json())
# change-password 同样 invalidate_user_tokens(time()+1)
time.sleep(1.2)
# 用最终密码重新登录(CMS
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': FINAL_PASSWORD, 'role': CMS_ROLE})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U8: 刷新 Token
u8_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/refresh/', json=u8_body)
log('U8', 'POST', '/api/user/auth/refresh/', 200, r.status_code,
req_headers=r.request.headers, req_body=u8_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
access = r.json().get('access', access)
auth = {'Authorization': f'Bearer {access}'}
# /me: GET /me
r = s.get(f'{BASE}/api/user/users/me/', headers=auth)
log('/me', 'GET', '/api/user/users/me/', 200, r.status_code,
f'phone={r.json().get("phone","")}' if r.status_code == 200 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
test_user_id = r.json().get('id') if r.status_code == 200 else None
# ── U9/U10: 用户列表 + 用户详情 ──────────────────────────────────────────────
# 创建 admin、teacher、student 用户 + 师生关系
ADMIN_PHONE = '13700000088'
TEACHER_PHONE = '13700000077'
STUDENT_PHONE = '13700000066'
ROLE_PWD = 'RoleTest1'
django_eval(
f'from apps.user.models import User, TeacherStudentRelation; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'admin = User.objects.create_user(username="{ADMIN_PHONE}", password="{ROLE_PWD}", '
f' phone="{ADMIN_PHONE}", real_name="Swagger管理员", role_type="super_admin", status=1); '
f'teacher = User.objects.create_user(username="{TEACHER_PHONE}", password="{ROLE_PWD}", '
f' phone="{TEACHER_PHONE}", real_name="Swagger教师", role_type="teacher", status=1); '
f'student = User.objects.create_user(username="{STUDENT_PHONE}", password="{ROLE_PWD}", '
f' phone="{STUDENT_PHONE}", real_name="Swagger学生", role_type="student", status=1); '
f'TeacherStudentRelation.objects.create(teacher=teacher, student=student, '
f' relation_type="指导", status=1); '
f'print(f"admin={{admin.id}} teacher={{teacher.id}} student={{student.id}}")'
)
# 管理员登录(CMSsuper_admin
r = s.post(f'{BASE}/api/user/auth/login/',
json={'account': ADMIN_PHONE, 'password': ROLE_PWD, 'role': 'super_admin'})
admin_tokens = r.json().get('tokens', {})
admin_auth = {'Authorization': f'Bearer {admin_tokens.get("access", "")}'}
admin_refresh = admin_tokens.get('refresh', '')
# U9: 管理员获取用户列表
r = s.get(f'{BASE}/api/user/users/', headers=admin_auth)
u9_detail = ''
if r.status_code == 200:
u9_data = r.json()
u9_results = u9_data.get('results', u9_data)
u9_detail = f'count={len(u9_results)}'
log('U9', 'GET', '/api/user/users/', 200, r.status_code, u9_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-b: 教师获取用户列表(仅名下学生)
# teacher 角色不在 CMS 登录角色内(U3 仅 CMS 角色),直接签发 token 用于权限演示
teacher_access = get_user_access_token(TEACHER_PHONE)
teacher_auth = {'Authorization': f'Bearer {teacher_access}'}
r = s.get(f'{BASE}/api/user/users/', headers=teacher_auth)
u9b_detail = ''
if r.status_code == 200:
u9b_data = r.json()
u9b_results = u9b_data.get('results', u9b_data)
u9b_detail = f'count={len(u9b_results)}(should=1 student only)'
log('U9-b', 'GET', '/api/user/users/ (teacher)', 200, r.status_code, u9b_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-c: 普通用户(当前测试用户)获取列表 → 403
r = s.get(f'{BASE}/api/user/users/', headers=auth)
log('U9-c', 'GET', '/api/user/users/ (normal)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.status_code == 403 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U10: 管理员查看学生详情
# 先获取 student id
student_id = django_eval(
f'from apps.user.models import User; '
f'u = User.objects.get(phone="{STUDENT_PHONE}"); print(u.id)'
)
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=admin_auth)
log('U10', 'GET', f'/api/user/users/{student_id}/', 200, r.status_code,
f'real_name={r.json().get("real_name","")}' if r.status_code == 200 else '',
resp_body=r.json())
# U10-b: 教师查看名下学生详情
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=teacher_auth)
log('U10-b', 'GET', f'/api/user/users/{student_id}/ (teacher)', 200, r.status_code,
resp_body=r.json())
# U10-c: 普通用户查看自己详情
if test_user_id:
r = s.get(f'{BASE}/api/user/users/{test_user_id}/', headers=auth)
log('U10-c', 'GET', f'/api/user/users/{test_user_id}/ (self)', 200, r.status_code,
resp_body=r.json())
# 清理辅助用户
django_eval(
f'from apps.user.models import User; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'print("cleaned")'
)
# U7: 退出登录 (logout) — 放最后,因为会吊销 refresh
u7_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/logout/', json=u7_body, headers=auth)
log('U7', 'POST', '/api/user/auth/logout/', 200, r.status_code,
req_body=u7_body, resp_body=r.json())
# ═══════════════════════════════════════════════════════════════════════════════
section('病例端接口 (C1-C5)')
# ═══════════════════════════════════════════════════════════════════════════════
# 重新登录(logout 吊销了上一个 refresh
time.sleep(1.2)
r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': FINAL_PASSWORD, 'role': CMS_ROLE})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
auth = {'Authorization': f'Bearer {access}'}
# 确保机构与科室存在(与 U2/U4 同一 institution_code
django_eval(
f'from apps.user.models import Institution, Department; '
f'inst, _ = Institution.objects.get_or_create(code="{INST_CODE}", '
f' defaults={{"name":"{INST_NAME}","type":"hospital","province":"北京","city":"北京"}}); '
f'Department.objects.get_or_create(name="{DEPT_NAME}", institution=inst, '
f' defaults={{"category":"临床"}}); '
f'print("OK")'
)
# C1: PDF 解析 — 使用项目真实 PDF 文件
REAL_PDF = r'D:\01Agent\medical_training\儿科 病例样例(SOAP+循证).pdf'
with open(REAL_PDF, 'rb') as f:
r = s.post(
f'{BASE}/api/case/cases/parse-pdf/',
files={'files': ('儿科 病例样例(SOAP+循证).pdf', f, 'application/pdf')},
data={'case_type': 'traditional'},
headers=auth,
)
c1_ok = [200, 500, 429] # 200=AI 解析成功, 500=AI 异常, 429=限流
detail = ''
c1_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
body = r.json()
c1_resp = body
detail = f'code={body.get("code", "")}'
if r.status_code == 200:
detail = f'parse_id={body.get("parse_id","")}, keys={list(body.get("data",{}).keys())[:5]}'
log('C1', 'POST', '/api/case/cases/parse-pdf/', c1_ok, r.status_code, detail,
req_body={'case_type': 'traditional', 'files': '<PDF file>'}, resp_body=c1_resp)
# C2: 生成评分规则 — 如果 C1 成功则用其返回的 data,否则用手工载荷
c1_data = None
if r.status_code == 200:
c1_data = body.get('data', {})
if c1_data:
c1_exam = c1_data.get('exam_items') or []
_write_log(
f' [INFO] C1 解析 exam_items: count={len(c1_exam)}, '
f'codes={[x.get("item_code") for x in c1_exam if isinstance(x, dict)]}'
)
c2_payload = c1_data if c1_data else {
'title': 'Swagger测试病例',
'case_type': 'traditional',
'chief_complaint': '发热3天',
'description': '患儿男4岁发热3天就诊',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗',
},
}
r = s.post(f'{BASE}/api/case/cases/generate-scoring-rules/', json=c2_payload, headers=auth)
c2_ok = [200, 500, 429]
detail = ''
scoring_rules_from_ai = None
c2_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
c2_body = r.json()
c2_resp = c2_body
if r.status_code == 200:
scoring_rules_from_ai = c2_body.get('scoring_rules', [])
detail = f'generated={c2_body.get("generated","")}, rules={len(scoring_rules_from_ai)}'
else:
detail = f'code={c2_body.get("code", "")}'
log('C2', 'POST', '/api/case/cases/generate-scoring-rules/', c2_ok, r.status_code, detail,
req_body=c2_payload, resp_body=c2_resp)
# C3: full-create — 优先用 C1+C2 AI 结果,否则用手工载荷
if c1_data and scoring_rules_from_ai:
_write_log(' [INFO] C3 使用 C1 AI 解析 + C2 AI 评分规则组装载荷')
payload = {**c1_data}
payload['department_name'] = DEPT_NAME # C1 常返回「儿科」,库内重名会 400
payload['scoring_rules'] = scoring_rules_from_ai
else:
_write_log(' [INFO] C3 使用手工表单载荷(C1/C2 未全部成功)')
payload = {
'title': 'Swagger-Try-It-Out-病例',
'case_type': 'traditional',
'difficulty': 'medium',
'chief_complaint': '发热 3 天',
'description': '患儿,男,4 岁,因发热 3 天就诊。',
'patient_age': 4,
'patient_gender': 'male',
'department_name': DEPT_NAME,
'estimated_minutes': 30,
'osce_enabled': False,
'tags': '儿科,发热',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗,退热处理',
'guideline_reference': '《儿科学》第 9 版',
},
'scoring_rules': [
{
'dimension': '诊断准确性',
'score_weight': 0.6,
'ai_auto_score': True,
'scoring_standard': '准确判断上呼吸道感染',
},
{
'dimension': '治疗方案',
'score_weight': 0.4,
'ai_auto_score': True,
'scoring_standard': '治疗方案合理',
},
],
'exam_items': SAMPLE_EXAM_ITEMS,
}
payload_exam_items = payload.get('exam_items') or []
_write_log(
f' [INFO] C3 提交 exam_items: count={len(payload_exam_items)}, '
f'codes={[x.get("item_code") for x in payload_exam_items if isinstance(x, dict)]}'
)
r = s.post(f'{BASE}/api/case/cases/full-create/', json=payload, headers=auth)
c3_resp = r.json() if r.headers.get('content-type', '').startswith('application/json') else None
log('C3', 'POST', '/api/case/cases/full-create/', 201, r.status_code,
req_body=payload, resp_body=c3_resp)
case_id = None
expected_exam_count = len(payload_exam_items)
if r.status_code == 201:
case_id = r.json()['case']['id']
resp_exam = (c3_resp or {}).get('exam_items', [])
resp_exam_count = len(resp_exam)
log(
'C3-exam', 'CHECK', f'/api/case/cases/{case_id}/ (resp exam_items)',
expected_exam_count, resp_exam_count,
f'codes={[e.get("item_code") for e in resp_exam]}',
resp_body={'exam_items': resp_exam} if resp_exam else None,
)
db_exam_count = count_case_exam_items(case_id)
db_codes = list_case_exam_item_codes(case_id)
log(
'C3-db', 'CHECK', f'case_exam_item case_id={case_id}',
expected_exam_count, db_exam_count,
f'db_codes={db_codes}',
)
else:
_write_log(' SKIP C3-exam/C3-db — C3 未成功')
# C4: GET full
if case_id:
r = s.get(f'{BASE}/api/case/cases/{case_id}/full/', headers=auth)
c4_detail = ''
c4_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
c4_resp = r.json()
c4_exam = c4_resp.get('exam_items', [])
c4_detail = f'exam_items={len(c4_exam)}(expect={expected_exam_count})'
log('C4', 'GET', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
c4_detail, resp_body=c4_resp)
if c4_resp is not None:
log(
'C4-exam', 'CHECK', f'/api/case/cases/{case_id}/full/ exam_items',
expected_exam_count, len(c4_resp.get('exam_items', [])),
f'codes={[e.get("item_code") for e in c4_resp.get("exam_items", [])]}',
)
else:
_write_log(' SKIP C4 — C3 未返回 case_id')
# C5: PATCH full
if case_id:
c5_body = {'title': 'Swagger-更新标题'}
r = s.patch(f'{BASE}/api/case/cases/{case_id}/full/', json=c5_body, headers=auth)
log('C5', 'PATCH', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
req_body=c5_body,
resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
else:
_write_log(' SKIP C5 — C3 未返回 case_id')
# ═══════════════════════════════════════════════════════════════════════════════
section('汇总')
# ═══════════════════════════════════════════════════════════════════════════════
total = len(results)
passed = sum(1 for r in results if r[5] == PASS)
failed = sum(1 for r in results if r[5] == FAIL)
_write_log(f'\n 总计: {total} 个接口 | 通过: {passed} | 失败: {failed}')
if failed:
_write_log('\n 失败接口:')
for r in results:
if r[5] == FAIL:
_write_log(f' {r[0]} {r[1]} {r[2]} -- expect={r[3]}, got={r[4]}')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(1)
else:
_write_log('\n ALL PASSED - 全部接口 Swagger Try-it-out 验证通过!')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(0)