""" Swagger Try-it-out 等效脚本:逐个调用所有接口,验证可达性和基本功能。 运行方式:.venv\\Scripts\\python.exe test/swagger_tryout.py 前提:Django dev server 已在 http://127.0.0.1:8000 运行,Redis 已启动;.env 指向目标库。 认证流程(与当前 API 一致): U1 发码(login) → U2 管理员代注册(机构编码+名称, 默认密码 Pass+手机号) → U3 密码登录 → U4 验证码登录(机构信息) → U4-new 新号自动注册 → U5 重置密码 → U6 改密 → U8 刷新 → /me → U9/U10 → U7 退出 病例段:C1 解析 exam_items → C3 full-create 写入 case_exam_item(校验响应条数与库表条数) 日志输出:logs/test-swagger-YYYY-MM-DD.log(含完整请求体和响应体) """ import io import json import sys import time import subprocess from datetime import datetime from pathlib import Path import requests # 修复 Windows GBK 编码问题 sys.stdout.reconfigure(encoding='utf-8') sys.stderr.reconfigure(encoding='utf-8') BASE = 'http://127.0.0.1:8000' PYTHON = r'D:\01Agent\medical_training\.venv\Scripts\python.exe' CWD = r'D:\01Agent\medical_training' PASS = 'PASS' FAIL = 'FAIL' results = [] # ─── 日志文件 ───────────────────────────────────────────────────────────────── LOGS_DIR = Path(CWD) / 'logs' LOGS_DIR.mkdir(exist_ok=True) LOG_FILE = LOGS_DIR / f'test-swagger-{datetime.now().strftime("%Y-%m-%d")}.log' _log_fh = open(LOG_FILE, 'a', encoding='utf-8') def _write_log(text): """同时写入日志文件和控制台。""" _log_fh.write(text + '\n') _log_fh.flush() print(text) def _format_headers(headers): """将 requests 的 headers 格式化为 JSON 字符串。""" if not headers: return '' return json.dumps(dict(headers), ensure_ascii=False) def log(api_id, method, url, expected, actual, detail='', req_headers=None, req_body=None, resp_headers=None, resp_body=None): status = PASS if actual in (expected if isinstance(expected, (list, tuple)) else [expected]) else FAIL results.append((api_id, method, url, expected, actual, status)) exp_str = str(expected) _write_log(f' {status} {api_id:<6} {method:<6} {url:<50} expect={exp_str:<20} got={actual} {detail}') # 详细请求/响应写入日志文件(不打印到控制台,避免过长) if req_headers is not None: _log_fh.write(f' >>> headers: {_format_headers(req_headers)}\n') if req_body is not None: body_str = json.dumps(req_body, ensure_ascii=False, indent=2) if isinstance(req_body, dict) else str(req_body) _log_fh.write(f' >>> body: {body_str}\n') if resp_headers is not None: _log_fh.write(f' <<< headers: {_format_headers(resp_headers)}\n') if resp_body is not None: body_str = json.dumps(resp_body, ensure_ascii=False, indent=2) if isinstance(resp_body, dict) else str(resp_body) # 截断过长响应 if len(body_str) > 2000: body_str = body_str[:2000] + f'... (truncated, {len(body_str)} chars total)' _log_fh.write(f' <<< body: {body_str}\n') _log_fh.flush() def section(title): line = f'\n{"="*90}\n {title}\n{"="*90}' _write_log(line) def django_eval(code): """在独立进程中执行 Django 代码并返回 stdout。""" preamble = ( 'import django, os; ' 'os.environ.setdefault("DJANGO_SETTINGS_MODULE","config.settings"); ' 'django.setup(); ' ) proc = subprocess.run( [PYTHON, '-c', preamble + code], capture_output=True, text=True, cwd=CWD, ) return proc.stdout.strip() def get_sms_code(phone, scene): """从 Redis 读取短信验证码。""" val = django_eval( f'from django.core.cache import cache; print(cache.get("sms:{scene}:{phone}"))' ) return val if val and val != 'None' else None def count_case_exam_items(case_id): """查询 medical_platform.case_exam_item 中该病例的检查项条数。""" val = django_eval( f'from apps.case.models import CaseExamItem; ' f'print(CaseExamItem.objects.filter(case_id={case_id}).count())' ) return int(val) if val and val.isdigit() else 0 def list_case_exam_item_codes(case_id): """返回该病例在库中的 item_code 列表(逗号分隔)。""" val = django_eval( f'from apps.case.models import CaseExamItem; ' f'codes = list(CaseExamItem.objects.filter(case_id={case_id}).order_by("display_order", "id").values_list("item_code", flat=True)); ' f'print(",".join(codes))' ) return val or '' def inject_sms_code(phone, scene, code='123456'): """手动注入短信验证码到 Redis。""" django_eval( f'from django.core.cache import cache; ' f'cache.set("sms:{scene}:{phone}", "{code}", 300)' ) return code # ─── 清理 Redis 残留数据(限流计数等)──────────────────────────────────────── print('\n[准备] 清理 Redis 缓存...') django_eval('from django.core.cache import cache; cache.clear(); print("OK")') # 删除上次可能残留的测试用户 SUPER_PHONE = '13700000090' # 超级管理员,执行 U2 代注册 SUPER_PWD = 'Super123' PHONE = '13700000099' # CMS 用户(doctor),走 U3 密码登录 STUDENT_PHONE_U4 = '13700000097' # 已录入学生,走 U4 验证码登录(非试用机构需机构匹配) PHONE_ALT = '13700000098' # U4 试用机构自动注册专用 INST_CODE = 'SWAG_TEST_HOSP' INST_NAME = 'Swagger测试医院' # 预留试用机构(与 apps.user.auth 常量保持一致) TRIAL_INST_CODE = 'PKU_LAB_TRIAL' TRIAL_INST_NAME = '北大医学部(实验室)试用' CMS_ROLE = 'doctor' # PHONE 用户的 CMS 角色 # 全库唯一科室名,避免 resolve_department("儿科") 命中多条 → CASE_DEPARTMENT_AMBIGUOUS DEPT_NAME = 'Swagger儿科' # C3 手工兜底时的检查项(C1 有解析结果时优先用 AI 的 exam_items) SAMPLE_EXAM_ITEMS = [ { 'item_code': 'blood_routine', 'item_name': '血常规', 'item_type': 'lab', 'category': '实验室检查', 'result_text': 'WBC 10×10^9/L', 'result_structured': {'wbc': '10×10^9/L'}, 'is_key': True, 'is_abnormal': False, 'score_weight': 1.0, 'display_order': 1, }, { 'item_code': 'crp', 'item_name': 'CRP', 'item_type': 'lab', 'category': '实验室检查', 'result_text': 'CRP 8 mg/L', 'is_key': False, 'is_abnormal': False, 'score_weight': 1.0, 'display_order': 2, }, ] django_eval( f'from apps.user.models import User; ' f'User.objects.filter(phone__in=["{SUPER_PHONE}","{PHONE}","{STUDENT_PHONE_U4}","{PHONE_ALT}"]).delete(); print("cleaned")' ) # 预置超级管理员(U2 代注册需管理员身份) django_eval( f'from apps.user.models import User; ' f'User.objects.create_user(username="{SUPER_PHONE}", password="{SUPER_PWD}", ' f' phone="{SUPER_PHONE}", real_name="Swagger超管", role_type="super_admin", status=1); ' f'print("super ok")' ) # 预置试用机构(仅增数据,不改表) django_eval( f'from apps.user.models import Institution; ' f'Institution.objects.get_or_create(code="{TRIAL_INST_CODE}", ' f' defaults={{"name":"{TRIAL_INST_NAME}","type":"hospital"}}); print("trial ok")' ) print('[准备] 完成\n') s = requests.Session() # U2 管理员代注册默认密码:Pass + 手机号 PASSWORD = f'Pass{PHONE}' def _institution_fields(): return {'institution_code': INST_CODE, 'institution_name': INST_NAME} def get_user_access_token(phone): """直接为指定用户签发 access token(用于非 CMS 角色,如 teacher 无法走 U3)。""" return django_eval( f'from apps.user.models import User; ' f'from rest_framework_simplejwt.tokens import RefreshToken; ' f'u=User.objects.get(phone="{phone}"); ' f'print(str(RefreshToken.for_user(u).access_token))' ) # ═══════════════════════════════════════════════════════════════════════════════ section('用户端接口 (U1-U10)') # ═══════════════════════════════════════════════════════════════════════════════ access = '' refresh = '' auth = {} # INST-LIST: 移动端机构列表(不分页,登录前可调用) r = s.get(f'{BASE}/api/user/institution_list/') inst_list_detail = '' if r.status_code == 200: items = r.json() trial_flags = [i for i in items if i.get('is_trial')] inst_list_detail = f'count={len(items)}, trial={len(trial_flags)}' log('INST-LIST', 'GET', '/api/user/institution_list/', 200, r.status_code, inst_list_detail, req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None) # U1: 发送验证码(login 场景) u1_body = {'phone': PHONE, 'scene': 'login'} r = s.post(f'{BASE}/api/user/auth/send-code/', json=u1_body) log('U1', 'POST', '/api/user/auth/send-code/', 200, r.status_code, req_headers=r.request.headers, req_body=u1_body, resp_headers=dict(r.headers), resp_body=r.json()) # U2: 管理员代注册(仅超管/医院管理员)。超管创建 CMS 角色 doctor,并自动创建机构 super_access = get_user_access_token(SUPER_PHONE) super_auth = {'Authorization': f'Bearer {super_access}'} u2_body = { 'phone': PHONE, 'real_name': 'Swagger测试', 'role_type': CMS_ROLE, **_institution_fields(), } r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body, headers=super_auth) u2_detail = 'has_tokens=%s' % ('tokens' in (r.json() if r.headers.get('content-type','').startswith('application/json') else {})) log('U2', 'POST', '/api/user/auth/register/ (super_admin)', 201, r.status_code, u2_detail, req_headers=r.request.headers, req_body=u2_body, resp_headers=dict(r.headers), resp_body=r.json()) # U2-tok: 代注册不返回 tokens(按管理员代注册语义) u2_has_tokens = 'tokens' in (r.json() if r.headers.get('content-type','').startswith('application/json') else {}) log('U2-tok', 'CHECK', 'register response has no tokens', False, u2_has_tokens) # U2-neg1: 未登录代注册 → 401 r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body) log('U2-neg1', 'POST', '/api/user/auth/register/ (anon)', 401, r.status_code, f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "", req_body={'phone': PHONE, '...': '...'}, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None) # U3: CMS 密码登录(账号 + 密码 + 角色,三者必填) u3_body = {'account': PHONE, 'password': PASSWORD, 'role': CMS_ROLE} r = s.post(f'{BASE}/api/user/auth/login/', json=u3_body) log('U3', 'POST', '/api/user/auth/login/', 200, r.status_code, req_headers=r.request.headers, req_body=u3_body, resp_headers=dict(r.headers), resp_body=r.json()) if r.status_code == 200: tokens = r.json().get('tokens', {}) access = tokens.get('access', '') refresh = tokens.get('refresh', '') auth = {'Authorization': f'Bearer {access}'} # U3-neg: 缺少角色 → 400 AUTH_BAD_CREDENTIALS r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': PASSWORD}) log('U3-neg', 'POST', '/api/user/auth/login/ (no role)', 400, r.status_code, f'code={r.json().get("code","")}', req_body={'account': PHONE, 'password': '***'}, resp_body=r.json()) # U2-neg2: 非管理员(doctor)代注册 → 403 USER_NO_REGISTER_PERMISSION r = s.post(f'{BASE}/api/user/auth/register/', json={'phone': '13700000095', 'real_name': '越权注册', 'role_type': 'student', **_institution_fields()}, headers=auth) log('U2-neg2', 'POST', '/api/user/auth/register/ (doctor)', 403, r.status_code, f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "", resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None) # U2-ha / U2-ha-neg: 医院管理员仅能在本机构内代注册 HA_PHONE = '13700000094' HA_STUDENT = '13700000093' django_eval( f'from apps.user.models import User, Institution; ' f'inst=Institution.objects.get(code="{INST_CODE}"); ' f'User.objects.filter(phone__in=["{HA_PHONE}","{HA_STUDENT}"]).delete(); ' f'User.objects.create_user(username="{HA_PHONE}", password="Hosp1234", phone="{HA_PHONE}", ' f' real_name="Swagger医院管理员", role_type="hospital_admin", institution=inst, status=1); ' f'print("hadmin ok")' ) ha_auth = {'Authorization': f'Bearer {get_user_access_token(HA_PHONE)}'} # U2-ha: 本机构建学生 → 201 ha_body = {'phone': HA_STUDENT, 'real_name': '本院学生', 'role_type': 'student', **_institution_fields()} r = s.post(f'{BASE}/api/user/auth/register/', json=ha_body, headers=ha_auth) log('U2-ha', 'POST', '/api/user/auth/register/ (hospital_admin own inst)', 201, r.status_code, req_body=ha_body, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None) # U2-ha-neg: 跨机构建账号 → 403 USER_INSTITUTION_SCOPE_FORBIDDEN ha_neg_body = {'phone': '13700000092', 'real_name': '跨院学生', 'role_type': 'student', 'institution_code': 'SWAG_OTHER_HOSP', 'institution_name': 'Swagger其它医院'} r = s.post(f'{BASE}/api/user/auth/register/', json=ha_neg_body, headers=ha_auth) log('U2-ha-neg', 'POST', '/api/user/auth/register/ (hospital_admin cross inst)', 403, r.status_code, f'code={r.json().get("code","")}' if r.headers.get("content-type","").startswith("application/json") else "", req_body=ha_neg_body, resp_body=r.json() if r.headers.get('content-type','').startswith('application/json') else None) django_eval(f'from apps.user.models import User; User.objects.filter(phone__in=["{HA_PHONE}","{HA_STUDENT}"]).delete()') # U4: 验证码登录(移动端,已录入学生 + 机构匹配 → 200) # 预创建学生,institution 与所选机构一致 django_eval( f'from apps.user.models import User, Institution; ' f'inst=Institution.objects.get(code="{INST_CODE}"); ' f'User.objects.filter(phone="{STUDENT_PHONE_U4}").delete(); ' f'User.objects.create_user(username="{STUDENT_PHONE_U4}", password=None, ' f' phone="{STUDENT_PHONE_U4}", real_name="Swagger学生U4", role_type="student", ' f' institution=inst, status=1); print("student ok")' ) r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': STUDENT_PHONE_U4, 'scene': 'login'}) login_code = get_sms_code(STUDENT_PHONE_U4, 'login') or inject_sms_code(STUDENT_PHONE_U4, 'login') u4_body = {'phone': STUDENT_PHONE_U4, 'code': login_code, 'institution_code': INST_CODE} r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_body) u4_detail = f'is_new_user={r.json().get("is_new_user")}' if r.status_code in (200, 201) else f'code={r.json().get("code","")}' log('U4', 'POST', '/api/user/auth/login-code/ (registered student)', 200, r.status_code, u4_detail, req_headers=r.request.headers, req_body=u4_body, resp_headers=dict(r.headers), resp_body=r.json()) # U4-neg: 非试用机构 + 未录入手机号 → 403 AUTH_NOT_REGISTERED unreg_phone = '13700000096' django_eval(f'from apps.user.models import User; User.objects.filter(phone="{unreg_phone}").delete()') inject_sms_code(unreg_phone, 'login') u4_neg_body = {'phone': unreg_phone, 'code': '123456', 'institution_code': INST_CODE} r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_neg_body) log('U4-neg', 'POST', '/api/user/auth/login-code/ (unregistered, non-trial)', 403, r.status_code, f'code={r.json().get("code","")}', req_body=u4_neg_body, resp_body=r.json()) # U4-new: 试用机构验证码登录自动注册(新手机号 → 201 is_new_user=true) django_eval(f'from apps.user.models import User; User.objects.filter(phone="{PHONE_ALT}").delete()') r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE_ALT, 'scene': 'login'}) log('U4-pre', 'POST', '/api/user/auth/send-code/ (alt)', 200, r.status_code, req_body={'phone': PHONE_ALT, 'scene': 'login'}) alt_code = get_sms_code(PHONE_ALT, 'login') or inject_sms_code(PHONE_ALT, 'login') u4_new_body = {'phone': PHONE_ALT, 'code': alt_code, 'institution_code': TRIAL_INST_CODE} r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_new_body) u4_new_detail = '' if r.status_code in (200, 201): u4_new_detail = f'is_new_user={r.json().get("is_new_user")}' log('U4-new', 'POST', '/api/user/auth/login-code/ (trial auto-register)', 201, r.status_code, u4_new_detail, req_body=u4_new_body, resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None) django_eval(f'from apps.user.models import User; User.objects.filter(phone__in=["{PHONE_ALT}","{STUDENT_PHONE_U4}","{unreg_phone}"]).delete()') # U5: 重置密码 NEW_PASSWORD = 'SwagNew1' r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE, 'scene': 'reset'}) reset_code = get_sms_code(PHONE, 'reset') if not reset_code: reset_code = inject_sms_code(PHONE, 'reset', '111111') u5_body = {'phone': PHONE, 'code': reset_code, 'new_password': NEW_PASSWORD} r = s.post(f'{BASE}/api/user/auth/reset-password/', json=u5_body) log('U5', 'POST', '/api/user/auth/reset-password/', 200, r.status_code, req_headers=r.request.headers, req_body=u5_body, resp_headers=dict(r.headers), resp_body=r.json()) # reset-password 调用 invalidate_user_tokens(time()+1),必须等 1s 再登录 time.sleep(1.2) # 用新密码重新登录(CMS) r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': NEW_PASSWORD, 'role': CMS_ROLE}) tokens = r.json().get('tokens', {}) access = tokens.get('access', '') refresh = tokens.get('refresh', '') auth = {'Authorization': f'Bearer {access}'} # U6: 修改密码 FINAL_PASSWORD = 'SwagFin1' u6_body = {'old_password': NEW_PASSWORD, 'new_password': FINAL_PASSWORD} r = s.post(f'{BASE}/api/user/users/change-password/', json=u6_body, headers=auth) log('U6', 'POST', '/api/user/users/change-password/', 200, r.status_code, req_headers=r.request.headers, req_body=u6_body, resp_headers=dict(r.headers), resp_body=r.json()) # change-password 同样 invalidate_user_tokens(time()+1) time.sleep(1.2) # 用最终密码重新登录(CMS) r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': FINAL_PASSWORD, 'role': CMS_ROLE}) tokens = r.json().get('tokens', {}) access = tokens.get('access', '') refresh = tokens.get('refresh', '') auth = {'Authorization': f'Bearer {access}'} # U8: 刷新 Token u8_body = {'refresh': refresh} r = s.post(f'{BASE}/api/user/auth/refresh/', json=u8_body) log('U8', 'POST', '/api/user/auth/refresh/', 200, r.status_code, req_headers=r.request.headers, req_body=u8_body, resp_headers=dict(r.headers), resp_body=r.json()) if r.status_code == 200: access = r.json().get('access', access) auth = {'Authorization': f'Bearer {access}'} # /me: GET /me r = s.get(f'{BASE}/api/user/users/me/', headers=auth) log('/me', 'GET', '/api/user/users/me/', 200, r.status_code, f'phone={r.json().get("phone","")}' if r.status_code == 200 else '', req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json()) test_user_id = r.json().get('id') if r.status_code == 200 else None # ── U9/U10: 用户列表 + 用户详情 ────────────────────────────────────────────── # 创建 admin、teacher、student 用户 + 师生关系 ADMIN_PHONE = '13700000088' TEACHER_PHONE = '13700000077' STUDENT_PHONE = '13700000066' ROLE_PWD = 'RoleTest1' django_eval( f'from apps.user.models import User, TeacherStudentRelation; ' f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); ' f'admin = User.objects.create_user(username="{ADMIN_PHONE}", password="{ROLE_PWD}", ' f' phone="{ADMIN_PHONE}", real_name="Swagger管理员", role_type="super_admin", status=1); ' f'teacher = User.objects.create_user(username="{TEACHER_PHONE}", password="{ROLE_PWD}", ' f' phone="{TEACHER_PHONE}", real_name="Swagger教师", role_type="teacher", status=1); ' f'student = User.objects.create_user(username="{STUDENT_PHONE}", password="{ROLE_PWD}", ' f' phone="{STUDENT_PHONE}", real_name="Swagger学生", role_type="student", status=1); ' f'TeacherStudentRelation.objects.create(teacher=teacher, student=student, ' f' relation_type="指导", status=1); ' f'print(f"admin={{admin.id}} teacher={{teacher.id}} student={{student.id}}")' ) # 管理员登录(CMS:super_admin) r = s.post(f'{BASE}/api/user/auth/login/', json={'account': ADMIN_PHONE, 'password': ROLE_PWD, 'role': 'super_admin'}) admin_tokens = r.json().get('tokens', {}) admin_auth = {'Authorization': f'Bearer {admin_tokens.get("access", "")}'} admin_refresh = admin_tokens.get('refresh', '') # U9: 管理员获取用户列表 r = s.get(f'{BASE}/api/user/users/', headers=admin_auth) u9_detail = '' if r.status_code == 200: u9_data = r.json() u9_results = u9_data.get('results', u9_data) u9_detail = f'count={len(u9_results)}' log('U9', 'GET', '/api/user/users/', 200, r.status_code, u9_detail, req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json()) # U9-b: 教师获取用户列表(仅名下学生) # teacher 角色不在 CMS 登录角色内(U3 仅 CMS 角色),直接签发 token 用于权限演示 teacher_access = get_user_access_token(TEACHER_PHONE) teacher_auth = {'Authorization': f'Bearer {teacher_access}'} r = s.get(f'{BASE}/api/user/users/', headers=teacher_auth) u9b_detail = '' if r.status_code == 200: u9b_data = r.json() u9b_results = u9b_data.get('results', u9b_data) u9b_detail = f'count={len(u9b_results)}(should=1 student only)' log('U9-b', 'GET', '/api/user/users/ (teacher)', 200, r.status_code, u9b_detail, req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json()) # U9-c: 普通用户(当前测试用户)获取列表 → 403 r = s.get(f'{BASE}/api/user/users/', headers=auth) log('U9-c', 'GET', '/api/user/users/ (normal)', 403, r.status_code, f'code={r.json().get("code","")}' if r.status_code == 403 else '', req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json()) # U10: 管理员查看学生详情 # 先获取 student id student_id = django_eval( f'from apps.user.models import User; ' f'u = User.objects.get(phone="{STUDENT_PHONE}"); print(u.id)' ) r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=admin_auth) log('U10', 'GET', f'/api/user/users/{student_id}/', 200, r.status_code, f'real_name={r.json().get("real_name","")}' if r.status_code == 200 else '', resp_body=r.json()) # U10-b: 教师查看名下学生详情 r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=teacher_auth) log('U10-b', 'GET', f'/api/user/users/{student_id}/ (teacher)', 200, r.status_code, resp_body=r.json()) # U10-c: 普通用户查看自己详情 if test_user_id: r = s.get(f'{BASE}/api/user/users/{test_user_id}/', headers=auth) log('U10-c', 'GET', f'/api/user/users/{test_user_id}/ (self)', 200, r.status_code, resp_body=r.json()) # 清理辅助用户 django_eval( f'from apps.user.models import User; ' f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); ' f'print("cleaned")' ) # U7: 退出登录 (logout) — 放最后,因为会吊销 refresh u7_body = {'refresh': refresh} r = s.post(f'{BASE}/api/user/auth/logout/', json=u7_body, headers=auth) log('U7', 'POST', '/api/user/auth/logout/', 200, r.status_code, req_body=u7_body, resp_body=r.json()) # ═══════════════════════════════════════════════════════════════════════════════ section('病例端接口 (C1-C5)') # ═══════════════════════════════════════════════════════════════════════════════ # 重新登录(logout 吊销了上一个 refresh) time.sleep(1.2) r = s.post(f'{BASE}/api/user/auth/login/', json={'account': PHONE, 'password': FINAL_PASSWORD, 'role': CMS_ROLE}) tokens = r.json().get('tokens', {}) access = tokens.get('access', '') auth = {'Authorization': f'Bearer {access}'} # 确保机构与科室存在(与 U2/U4 同一 institution_code) django_eval( f'from apps.user.models import Institution, Department; ' f'inst, _ = Institution.objects.get_or_create(code="{INST_CODE}", ' f' defaults={{"name":"{INST_NAME}","type":"hospital","province":"北京","city":"北京"}}); ' f'Department.objects.get_or_create(name="{DEPT_NAME}", ' f' defaults={{"category":"临床"}}); ' f'print("OK")' ) # C1: PDF 解析 — 使用项目真实 PDF 文件 REAL_PDF = r'D:\01Agent\medical_training\儿科 病例样例(SOAP+循证).pdf' with open(REAL_PDF, 'rb') as f: r = s.post( f'{BASE}/api/case/cases/parse-pdf/', files={'files': ('儿科 病例样例(SOAP+循证).pdf', f, 'application/pdf')}, data={'case_type': 'traditional'}, headers=auth, ) c1_ok = [200, 500, 429] # 200=AI 解析成功, 500=AI 异常, 429=限流 detail = '' c1_resp = None if r.headers.get('content-type', '').startswith('application/json'): body = r.json() c1_resp = body detail = f'code={body.get("code", "")}' if r.status_code == 200: detail = f'parse_id={body.get("parse_id","")}, keys={list(body.get("data",{}).keys())[:5]}' log('C1', 'POST', '/api/case/cases/parse-pdf/', c1_ok, r.status_code, detail, req_body={'case_type': 'traditional', 'files': ''}, resp_body=c1_resp) # C2: 生成评分规则 — 如果 C1 成功则用其返回的 data,否则用手工载荷 c1_data = None if r.status_code == 200: c1_data = body.get('data', {}) if c1_data: c1_exam = c1_data.get('exam_items') or [] _write_log( f' [INFO] C1 解析 exam_items: count={len(c1_exam)}, ' f'codes={[x.get("item_code") for x in c1_exam if isinstance(x, dict)]}' ) c2_payload = c1_data if c1_data else { 'title': 'Swagger测试病例', 'case_type': 'traditional', 'chief_complaint': '发热3天', 'description': '患儿男4岁发热3天就诊', 'traditional': { 'standard_diagnosis': '上呼吸道感染', 'standard_treatment': '对症治疗', }, } r = s.post(f'{BASE}/api/case/cases/generate-scoring-rules/', json=c2_payload, headers=auth) c2_ok = [200, 500, 429] detail = '' scoring_rules_from_ai = None c2_resp = None if r.headers.get('content-type', '').startswith('application/json'): c2_body = r.json() c2_resp = c2_body if r.status_code == 200: scoring_rules_from_ai = c2_body.get('scoring_rules', []) detail = f'generated={c2_body.get("generated","")}, rules={len(scoring_rules_from_ai)}' else: detail = f'code={c2_body.get("code", "")}' log('C2', 'POST', '/api/case/cases/generate-scoring-rules/', c2_ok, r.status_code, detail, req_body=c2_payload, resp_body=c2_resp) # C3: full-create — 优先用 C1+C2 AI 结果,否则用手工载荷 if c1_data and scoring_rules_from_ai: _write_log(' [INFO] C3 使用 C1 AI 解析 + C2 AI 评分规则组装载荷') payload = {**c1_data} payload['department_name'] = DEPT_NAME # C1 常返回「儿科」,库内重名会 400 payload['scoring_rules'] = scoring_rules_from_ai else: _write_log(' [INFO] C3 使用手工表单载荷(C1/C2 未全部成功)') payload = { 'title': 'Swagger-Try-It-Out-病例', 'case_type': 'traditional', 'difficulty': 'medium', 'chief_complaint': '发热 3 天', 'description': '患儿,男,4 岁,因发热 3 天就诊。', 'patient_age': 4, 'patient_gender': 'male', 'department_name': DEPT_NAME, 'estimated_minutes': 30, 'osce_enabled': False, 'tags': '儿科,发热', 'traditional': { 'standard_diagnosis': '上呼吸道感染', 'standard_treatment': '对症治疗,退热处理', 'guideline_reference': '《儿科学》第 9 版', }, 'scoring_rules': [ { 'dimension': '诊断准确性', 'score_weight': 0.6, 'ai_auto_score': True, 'scoring_standard': '准确判断上呼吸道感染', }, { 'dimension': '治疗方案', 'score_weight': 0.4, 'ai_auto_score': True, 'scoring_standard': '治疗方案合理', }, ], 'exam_items': SAMPLE_EXAM_ITEMS, } payload_exam_items = payload.get('exam_items') or [] _write_log( f' [INFO] C3 提交 exam_items: count={len(payload_exam_items)}, ' f'codes={[x.get("item_code") for x in payload_exam_items if isinstance(x, dict)]}' ) r = s.post(f'{BASE}/api/case/cases/full-create/', json=payload, headers=auth) c3_resp = r.json() if r.headers.get('content-type', '').startswith('application/json') else None log('C3', 'POST', '/api/case/cases/full-create/', 201, r.status_code, req_body=payload, resp_body=c3_resp) case_id = None expected_exam_count = len(payload_exam_items) if r.status_code == 201: case_id = r.json()['case']['id'] resp_exam = (c3_resp or {}).get('exam_items', []) resp_exam_count = len(resp_exam) log( 'C3-exam', 'CHECK', f'/api/case/cases/{case_id}/ (resp exam_items)', expected_exam_count, resp_exam_count, f'codes={[e.get("item_code") for e in resp_exam]}', resp_body={'exam_items': resp_exam} if resp_exam else None, ) db_exam_count = count_case_exam_items(case_id) db_codes = list_case_exam_item_codes(case_id) log( 'C3-db', 'CHECK', f'case_exam_item case_id={case_id}', expected_exam_count, db_exam_count, f'db_codes={db_codes}', ) else: _write_log(' SKIP C3-exam/C3-db — C3 未成功') # C4: GET full if case_id: r = s.get(f'{BASE}/api/case/cases/{case_id}/full/', headers=auth) c4_detail = '' c4_resp = None if r.headers.get('content-type', '').startswith('application/json'): c4_resp = r.json() c4_exam = c4_resp.get('exam_items', []) c4_detail = f'exam_items={len(c4_exam)}(expect={expected_exam_count})' log('C4', 'GET', f'/api/case/cases/{case_id}/full/', 200, r.status_code, c4_detail, resp_body=c4_resp) if c4_resp is not None: log( 'C4-exam', 'CHECK', f'/api/case/cases/{case_id}/full/ exam_items', expected_exam_count, len(c4_resp.get('exam_items', [])), f'codes={[e.get("item_code") for e in c4_resp.get("exam_items", [])]}', ) else: _write_log(' SKIP C4 — C3 未返回 case_id') # C5: PATCH full if case_id: c5_body = {'title': 'Swagger-更新标题'} r = s.patch(f'{BASE}/api/case/cases/{case_id}/full/', json=c5_body, headers=auth) log('C5', 'PATCH', f'/api/case/cases/{case_id}/full/', 200, r.status_code, req_body=c5_body, resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None) else: _write_log(' SKIP C5 — C3 未返回 case_id') # ═══════════════════════════════════════════════════════════════════════════════ section('汇总') # ═══════════════════════════════════════════════════════════════════════════════ total = len(results) passed = sum(1 for r in results if r[5] == PASS) failed = sum(1 for r in results if r[5] == FAIL) _write_log(f'\n 总计: {total} 个接口 | 通过: {passed} | 失败: {failed}') if failed: _write_log('\n 失败接口:') for r in results: if r[5] == FAIL: _write_log(f' {r[0]} {r[1]} {r[2]} -- expect={r[3]}, got={r[4]}') _write_log(f'\n 日志文件: {LOG_FILE}') _log_fh.close() sys.exit(1) else: _write_log('\n ALL PASSED - 全部接口 Swagger Try-it-out 验证通过!') _write_log(f'\n 日志文件: {LOG_FILE}') _log_fh.close() sys.exit(0)