Files
medical_training/test/swagger_tryout.py
T

482 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Swagger Try-it-out 等效脚本:逐个调用所有接口,验证可达性和基本功能。
运行方式:.venv\\Scripts\\python.exe test/swagger_tryout.py
前提:Django dev server 已在 http://127.0.0.1:8000 运行,Redis 已启动。
日志输出:logs/test-swagger-YYYY-MM-DD.log(含完整请求体和响应体)
"""
import io
import json
import sys
import time
import subprocess
from datetime import datetime
from pathlib import Path
import requests
# 修复 Windows GBK 编码问题
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
BASE = 'http://127.0.0.1:8000'
PYTHON = r'D:\01Agent\medical_training\.venv\Scripts\python.exe'
CWD = r'D:\01Agent\medical_training'
PASS = 'PASS'
FAIL = 'FAIL'
results = []
# ─── 日志文件 ─────────────────────────────────────────────────────────────────
LOGS_DIR = Path(CWD) / 'logs'
LOGS_DIR.mkdir(exist_ok=True)
LOG_FILE = LOGS_DIR / f'test-swagger-{datetime.now().strftime("%Y-%m-%d")}.log'
_log_fh = open(LOG_FILE, 'a', encoding='utf-8')
def _write_log(text):
"""同时写入日志文件和控制台。"""
_log_fh.write(text + '\n')
_log_fh.flush()
print(text)
def _format_headers(headers):
"""将 requests 的 headers 格式化为 JSON 字符串。"""
if not headers:
return ''
return json.dumps(dict(headers), ensure_ascii=False)
def log(api_id, method, url, expected, actual, detail='',
req_headers=None, req_body=None, resp_headers=None, resp_body=None):
status = PASS if actual in (expected if isinstance(expected, (list, tuple)) else [expected]) else FAIL
results.append((api_id, method, url, expected, actual, status))
exp_str = str(expected)
_write_log(f' {status} {api_id:<6} {method:<6} {url:<50} expect={exp_str:<20} got={actual} {detail}')
# 详细请求/响应写入日志文件(不打印到控制台,避免过长)
if req_headers is not None:
_log_fh.write(f' >>> headers: {_format_headers(req_headers)}\n')
if req_body is not None:
body_str = json.dumps(req_body, ensure_ascii=False, indent=2) if isinstance(req_body, dict) else str(req_body)
_log_fh.write(f' >>> body: {body_str}\n')
if resp_headers is not None:
_log_fh.write(f' <<< headers: {_format_headers(resp_headers)}\n')
if resp_body is not None:
body_str = json.dumps(resp_body, ensure_ascii=False, indent=2) if isinstance(resp_body, dict) else str(resp_body)
# 截断过长响应
if len(body_str) > 2000:
body_str = body_str[:2000] + f'... (truncated, {len(body_str)} chars total)'
_log_fh.write(f' <<< body: {body_str}\n')
_log_fh.flush()
def section(title):
line = f'\n{"="*90}\n {title}\n{"="*90}'
_write_log(line)
def django_eval(code):
"""在独立进程中执行 Django 代码并返回 stdout。"""
preamble = (
'import django, os; '
'os.environ.setdefault("DJANGO_SETTINGS_MODULE","config.settings"); '
'django.setup(); '
)
proc = subprocess.run(
[PYTHON, '-c', preamble + code],
capture_output=True, text=True, cwd=CWD,
)
return proc.stdout.strip()
def get_sms_code(phone, scene):
"""从 Redis 读取短信验证码。"""
val = django_eval(
f'from django.core.cache import cache; print(cache.get("sms:{scene}:{phone}"))'
)
return val if val and val != 'None' else None
def inject_sms_code(phone, scene, code='123456'):
"""手动注入短信验证码到 Redis。"""
django_eval(
f'from django.core.cache import cache; '
f'cache.set("sms:{scene}:{phone}", "{code}", 300)'
)
return code
# ─── 清理 Redis 残留数据(限流计数等)────────────────────────────────────────
print('\n[准备] 清理 Redis 缓存...')
django_eval('from django.core.cache import cache; cache.clear(); print("OK")')
# 删除上次可能残留的测试用户
PHONE = '13700000099'
django_eval(
f'from apps.user.models import User; '
f'User.objects.filter(phone="{PHONE}").delete(); print("cleaned")'
)
print('[准备] 完成\n')
s = requests.Session()
PASSWORD = 'SwagTest1'
# ═══════════════════════════════════════════════════════════════════════════════
section('用户端接口 (U1-U10)')
# ═══════════════════════════════════════════════════════════════════════════════
# U1: 发送验证码 (register)
u1_body = {'phone': PHONE, 'scene': 'register'}
r = s.post(f'{BASE}/api/user/auth/send-code/', json=u1_body)
log('U1', 'POST', '/api/user/auth/send-code/', 200, r.status_code,
req_headers=r.request.headers, req_body=u1_body,
resp_headers=dict(r.headers), resp_body=r.json())
# U2: 注册
code = get_sms_code(PHONE, 'register')
if not code:
code = inject_sms_code(PHONE, 'register')
u2_body = {'phone': PHONE, 'code': code, 'password': PASSWORD, 'real_name': 'Swagger测试'}
r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body)
log('U2', 'POST', '/api/user/auth/register/', 201, r.status_code,
req_headers=r.request.headers, req_body=u2_body,
resp_headers=dict(r.headers), resp_body=r.json())
# U3: 密码登录
u3_body = {'phone': PHONE, 'password': PASSWORD}
r = s.post(f'{BASE}/api/user/auth/login/', json=u3_body)
log('U3', 'POST', '/api/user/auth/login/', 200, r.status_code,
req_headers=r.request.headers, req_body=u3_body,
resp_headers=dict(r.headers), resp_body=r.json())
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U4: 验证码登录
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE, 'scene': 'login'})
login_code = get_sms_code(PHONE, 'login')
if not login_code:
login_code = inject_sms_code(PHONE, 'login', '654321')
u4_body = {'phone': PHONE, 'code': login_code}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_body)
log('U4', 'POST', '/api/user/auth/login-code/', 200, r.status_code,
req_headers=r.request.headers, req_body=u4_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
tokens = r.json().get('tokens', {})
access = tokens.get('access', access)
refresh = tokens.get('refresh', refresh)
auth = {'Authorization': f'Bearer {access}'}
# U5: 重置密码
NEW_PASSWORD = 'SwagNew1'
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE, 'scene': 'reset'})
reset_code = get_sms_code(PHONE, 'reset')
if not reset_code:
reset_code = inject_sms_code(PHONE, 'reset', '111111')
u5_body = {'phone': PHONE, 'code': reset_code, 'new_password': NEW_PASSWORD}
r = s.post(f'{BASE}/api/user/auth/reset-password/', json=u5_body)
log('U5', 'POST', '/api/user/auth/reset-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u5_body,
resp_headers=dict(r.headers), resp_body=r.json())
# reset-password 调用 invalidate_user_tokens(time()+1),必须等 1s 再登录
time.sleep(1.2)
# 用新密码重新登录
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': PHONE, 'password': NEW_PASSWORD})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U6: 修改密码
FINAL_PASSWORD = 'SwagFin1'
u6_body = {'old_password': NEW_PASSWORD, 'new_password': FINAL_PASSWORD}
r = s.post(f'{BASE}/api/user/users/change-password/', json=u6_body, headers=auth)
log('U6', 'POST', '/api/user/users/change-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u6_body,
resp_headers=dict(r.headers), resp_body=r.json())
# change-password 同样 invalidate_user_tokens(time()+1)
time.sleep(1.2)
# 用最终密码重新登录
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': PHONE, 'password': FINAL_PASSWORD})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U8: 刷新 Token
u8_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/refresh/', json=u8_body)
log('U8', 'POST', '/api/user/auth/refresh/', 200, r.status_code,
req_headers=r.request.headers, req_body=u8_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
access = r.json().get('access', access)
auth = {'Authorization': f'Bearer {access}'}
# /me: GET /me
r = s.get(f'{BASE}/api/user/users/me/', headers=auth)
log('/me', 'GET', '/api/user/users/me/', 200, r.status_code,
f'phone={r.json().get("phone","")}' if r.status_code == 200 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
test_user_id = r.json().get('id') if r.status_code == 200 else None
# ── U9/U10: 用户列表 + 用户详情 ──────────────────────────────────────────────
# 创建 admin、teacher、student 用户 + 师生关系
ADMIN_PHONE = '13700000088'
TEACHER_PHONE = '13700000077'
STUDENT_PHONE = '13700000066'
ROLE_PWD = 'RoleTest1'
django_eval(
f'from apps.user.models import User, TeacherStudentRelation; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'admin = User.objects.create_user(username="{ADMIN_PHONE}", password="{ROLE_PWD}", '
f' phone="{ADMIN_PHONE}", real_name="Swagger管理员", role_type="super_admin", status=1); '
f'teacher = User.objects.create_user(username="{TEACHER_PHONE}", password="{ROLE_PWD}", '
f' phone="{TEACHER_PHONE}", real_name="Swagger教师", role_type="teacher", status=1); '
f'student = User.objects.create_user(username="{STUDENT_PHONE}", password="{ROLE_PWD}", '
f' phone="{STUDENT_PHONE}", real_name="Swagger学生", role_type="student", status=1); '
f'TeacherStudentRelation.objects.create(teacher=teacher, student=student, '
f' relation_type="指导", status=1); '
f'print(f"admin={{admin.id}} teacher={{teacher.id}} student={{student.id}}")'
)
# 管理员登录
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': ADMIN_PHONE, 'password': ROLE_PWD})
admin_tokens = r.json().get('tokens', {})
admin_auth = {'Authorization': f'Bearer {admin_tokens.get("access", "")}'}
admin_refresh = admin_tokens.get('refresh', '')
# U9: 管理员获取用户列表
r = s.get(f'{BASE}/api/user/users/', headers=admin_auth)
u9_detail = ''
if r.status_code == 200:
u9_data = r.json()
u9_results = u9_data.get('results', u9_data)
u9_detail = f'count={len(u9_results)}'
log('U9', 'GET', '/api/user/users/', 200, r.status_code, u9_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-b: 教师获取用户列表(仅名下学生)
r_teacher_login = s.post(f'{BASE}/api/user/auth/login/',
json={'phone': TEACHER_PHONE, 'password': ROLE_PWD})
teacher_tokens = r_teacher_login.json().get('tokens', {})
teacher_auth = {'Authorization': f'Bearer {teacher_tokens.get("access", "")}'}
r = s.get(f'{BASE}/api/user/users/', headers=teacher_auth)
u9b_detail = ''
if r.status_code == 200:
u9b_data = r.json()
u9b_results = u9b_data.get('results', u9b_data)
u9b_detail = f'count={len(u9b_results)}(should=1 student only)'
log('U9-b', 'GET', '/api/user/users/ (teacher)', 200, r.status_code, u9b_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-c: 普通用户(当前测试用户)获取列表 → 403
r = s.get(f'{BASE}/api/user/users/', headers=auth)
log('U9-c', 'GET', '/api/user/users/ (normal)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.status_code == 403 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U10: 管理员查看学生详情
# 先获取 student id
student_id = django_eval(
f'from apps.user.models import User; '
f'u = User.objects.get(phone="{STUDENT_PHONE}"); print(u.id)'
)
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=admin_auth)
log('U10', 'GET', f'/api/user/users/{student_id}/', 200, r.status_code,
f'real_name={r.json().get("real_name","")}' if r.status_code == 200 else '',
resp_body=r.json())
# U10-b: 教师查看名下学生详情
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=teacher_auth)
log('U10-b', 'GET', f'/api/user/users/{student_id}/ (teacher)', 200, r.status_code,
resp_body=r.json())
# U10-c: 普通用户查看自己详情
if test_user_id:
r = s.get(f'{BASE}/api/user/users/{test_user_id}/', headers=auth)
log('U10-c', 'GET', f'/api/user/users/{test_user_id}/ (self)', 200, r.status_code,
resp_body=r.json())
# 清理辅助用户
django_eval(
f'from apps.user.models import User; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'print("cleaned")'
)
# U7: 退出登录 (logout) — 放最后,因为会吊销 refresh
u7_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/logout/', json=u7_body, headers=auth)
log('U7', 'POST', '/api/user/auth/logout/', 200, r.status_code,
req_body=u7_body, resp_body=r.json())
# ═══════════════════════════════════════════════════════════════════════════════
section('病例端接口 (C1-C5)')
# ═══════════════════════════════════════════════════════════════════════════════
# 重新登录(logout 吊销了上一个 refresh
time.sleep(1.2)
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': PHONE, 'password': FINAL_PASSWORD})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
auth = {'Authorization': f'Bearer {access}'}
# 确保科室存在
django_eval(
'from apps.user.models import Institution, Department; '
'inst, _ = Institution.objects.get_or_create(name="测试医院", '
' defaults={"type":"hospital","province":"北京","city":"北京"}); '
'Department.objects.get_or_create(name="儿科", '
' defaults={"institution":inst,"category":"临床"}); '
'print("OK")'
)
# C1: PDF 解析 — 使用项目真实 PDF 文件
REAL_PDF = r'D:\01Agent\medical_training\儿科 病例样例(SOAP+循证).pdf'
with open(REAL_PDF, 'rb') as f:
r = s.post(
f'{BASE}/api/case/cases/parse-pdf/',
files={'files': ('儿科 病例样例(SOAP+循证).pdf', f, 'application/pdf')},
data={'case_type': 'traditional'},
headers=auth,
)
c1_ok = [200, 500, 429] # 200=AI 解析成功, 500=AI 异常, 429=限流
detail = ''
c1_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
body = r.json()
c1_resp = body
detail = f'code={body.get("code", "")}'
if r.status_code == 200:
detail = f'parse_id={body.get("parse_id","")}, keys={list(body.get("data",{}).keys())[:5]}'
log('C1', 'POST', '/api/case/cases/parse-pdf/', c1_ok, r.status_code, detail,
req_body={'case_type': 'traditional', 'files': '<PDF file>'}, resp_body=c1_resp)
# C2: 生成评分规则 — 如果 C1 成功则用其返回的 data,否则用手工载荷
c1_data = None
if r.status_code == 200:
c1_data = body.get('data', {})
c2_payload = c1_data if c1_data else {
'title': 'Swagger测试病例',
'case_type': 'traditional',
'chief_complaint': '发热3天',
'description': '患儿男4岁发热3天就诊',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗',
},
}
r = s.post(f'{BASE}/api/case/cases/generate-scoring-rules/', json=c2_payload, headers=auth)
c2_ok = [200, 500, 429]
detail = ''
scoring_rules_from_ai = None
c2_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
c2_body = r.json()
c2_resp = c2_body
if r.status_code == 200:
scoring_rules_from_ai = c2_body.get('scoring_rules', [])
detail = f'generated={c2_body.get("generated","")}, rules={len(scoring_rules_from_ai)}'
else:
detail = f'code={c2_body.get("code", "")}'
log('C2', 'POST', '/api/case/cases/generate-scoring-rules/', c2_ok, r.status_code, detail,
req_body=c2_payload, resp_body=c2_resp)
# C3: full-create — 优先用 C1+C2 AI 结果,否则用手工载荷
if c1_data and scoring_rules_from_ai:
_write_log(' [INFO] C3 使用 C1 AI 解析 + C2 AI 评分规则组装载荷')
payload = {**c1_data}
payload['scoring_rules'] = scoring_rules_from_ai
else:
_write_log(' [INFO] C3 使用手工表单载荷(C1/C2 未全部成功)')
payload = {
'title': 'Swagger-Try-It-Out-病例',
'case_type': 'traditional',
'difficulty': 'medium',
'chief_complaint': '发热 3 天',
'description': '患儿,男,4 岁,因发热 3 天就诊。',
'patient_age': 4,
'patient_gender': 'male',
'department_name': '儿科',
'estimated_minutes': 30,
'osce_enabled': False,
'tags': '儿科,发热',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗,退热处理',
'guideline_reference': '《儿科学》第 9 版',
},
'scoring_rules': [
{
'dimension': '诊断准确性',
'score_weight': 0.6,
'ai_auto_score': True,
'scoring_standard': '准确判断上呼吸道感染',
},
{
'dimension': '治疗方案',
'score_weight': 0.4,
'ai_auto_score': True,
'scoring_standard': '治疗方案合理',
},
],
}
r = s.post(f'{BASE}/api/case/cases/full-create/', json=payload, headers=auth)
c3_resp = r.json() if r.headers.get('content-type', '').startswith('application/json') else None
log('C3', 'POST', '/api/case/cases/full-create/', 201, r.status_code,
req_body=payload, resp_body=c3_resp)
case_id = None
if r.status_code == 201:
case_id = r.json()['case']['id']
# C4: GET full
if case_id:
r = s.get(f'{BASE}/api/case/cases/{case_id}/full/', headers=auth)
log('C4', 'GET', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
else:
_write_log(' SKIP C4 — C3 未返回 case_id')
# C5: PATCH full
if case_id:
c5_body = {'title': 'Swagger-更新标题'}
r = s.patch(f'{BASE}/api/case/cases/{case_id}/full/', json=c5_body, headers=auth)
log('C5', 'PATCH', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
req_body=c5_body,
resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
else:
_write_log(' SKIP C5 — C3 未返回 case_id')
# ═══════════════════════════════════════════════════════════════════════════════
section('汇总')
# ═══════════════════════════════════════════════════════════════════════════════
total = len(results)
passed = sum(1 for r in results if r[5] == PASS)
failed = sum(1 for r in results if r[5] == FAIL)
_write_log(f'\n 总计: {total} 个接口 | 通过: {passed} | 失败: {failed}')
if failed:
_write_log('\n 失败接口:')
for r in results:
if r[5] == FAIL:
_write_log(f' {r[0]} {r[1]} {r[2]} -- expect={r[3]}, got={r[4]}')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(1)
else:
_write_log('\n ALL PASSED - 全部接口 Swagger Try-it-out 验证通过!')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(0)