Files
medical_training/test/swagger_tryout.py
T

612 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Swagger Try-it-out 等效脚本:逐个调用所有接口,验证可达性和基本功能。
运行方式:.venv\\Scripts\\python.exe test/swagger_tryout.py
前提:Django dev server 已在 http://127.0.0.1:8000 运行,Redis 已启动;.env 指向目标库。
认证流程(与当前 API 一致):
U1 发码(login) → U2 管理员代注册(机构编码+名称, 默认密码 Pass+手机号)
→ U3 密码登录 → U4 验证码登录(机构信息) → U4-new 新号自动注册
→ U5 重置密码 → U6 改密 → U8 刷新 → /me → U9/U10 → U7 退出
病例段:C1 解析 exam_items → C3 full-create 写入 case_exam_item(校验响应条数与库表条数)
日志输出:logs/test-swagger-YYYY-MM-DD.log(含完整请求体和响应体)
"""
import io
import json
import sys
import time
import subprocess
from datetime import datetime
from pathlib import Path
import requests
# 修复 Windows GBK 编码问题
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
BASE = 'http://127.0.0.1:8000'
PYTHON = r'D:\01Agent\medical_training\.venv\Scripts\python.exe'
CWD = r'D:\01Agent\medical_training'
PASS = 'PASS'
FAIL = 'FAIL'
results = []
# ─── 日志文件 ─────────────────────────────────────────────────────────────────
LOGS_DIR = Path(CWD) / 'logs'
LOGS_DIR.mkdir(exist_ok=True)
LOG_FILE = LOGS_DIR / f'test-swagger-{datetime.now().strftime("%Y-%m-%d")}.log'
_log_fh = open(LOG_FILE, 'a', encoding='utf-8')
def _write_log(text):
"""同时写入日志文件和控制台。"""
_log_fh.write(text + '\n')
_log_fh.flush()
print(text)
def _format_headers(headers):
"""将 requests 的 headers 格式化为 JSON 字符串。"""
if not headers:
return ''
return json.dumps(dict(headers), ensure_ascii=False)
def log(api_id, method, url, expected, actual, detail='',
req_headers=None, req_body=None, resp_headers=None, resp_body=None):
status = PASS if actual in (expected if isinstance(expected, (list, tuple)) else [expected]) else FAIL
results.append((api_id, method, url, expected, actual, status))
exp_str = str(expected)
_write_log(f' {status} {api_id:<6} {method:<6} {url:<50} expect={exp_str:<20} got={actual} {detail}')
# 详细请求/响应写入日志文件(不打印到控制台,避免过长)
if req_headers is not None:
_log_fh.write(f' >>> headers: {_format_headers(req_headers)}\n')
if req_body is not None:
body_str = json.dumps(req_body, ensure_ascii=False, indent=2) if isinstance(req_body, dict) else str(req_body)
_log_fh.write(f' >>> body: {body_str}\n')
if resp_headers is not None:
_log_fh.write(f' <<< headers: {_format_headers(resp_headers)}\n')
if resp_body is not None:
body_str = json.dumps(resp_body, ensure_ascii=False, indent=2) if isinstance(resp_body, dict) else str(resp_body)
# 截断过长响应
if len(body_str) > 2000:
body_str = body_str[:2000] + f'... (truncated, {len(body_str)} chars total)'
_log_fh.write(f' <<< body: {body_str}\n')
_log_fh.flush()
def section(title):
line = f'\n{"="*90}\n {title}\n{"="*90}'
_write_log(line)
def django_eval(code):
"""在独立进程中执行 Django 代码并返回 stdout。"""
preamble = (
'import django, os; '
'os.environ.setdefault("DJANGO_SETTINGS_MODULE","config.settings"); '
'django.setup(); '
)
proc = subprocess.run(
[PYTHON, '-c', preamble + code],
capture_output=True, text=True, cwd=CWD,
)
return proc.stdout.strip()
def get_sms_code(phone, scene):
"""从 Redis 读取短信验证码。"""
val = django_eval(
f'from django.core.cache import cache; print(cache.get("sms:{scene}:{phone}"))'
)
return val if val and val != 'None' else None
def count_case_exam_items(case_id):
"""查询 medical_platform.case_exam_item 中该病例的检查项条数。"""
val = django_eval(
f'from apps.case.models import CaseExamItem; '
f'print(CaseExamItem.objects.filter(case_id={case_id}).count())'
)
return int(val) if val and val.isdigit() else 0
def list_case_exam_item_codes(case_id):
"""返回该病例在库中的 item_code 列表(逗号分隔)。"""
val = django_eval(
f'from apps.case.models import CaseExamItem; '
f'codes = list(CaseExamItem.objects.filter(case_id={case_id}).order_by("display_order", "id").values_list("item_code", flat=True)); '
f'print(",".join(codes))'
)
return val or ''
def inject_sms_code(phone, scene, code='123456'):
"""手动注入短信验证码到 Redis。"""
django_eval(
f'from django.core.cache import cache; '
f'cache.set("sms:{scene}:{phone}", "{code}", 300)'
)
return code
# ─── 清理 Redis 残留数据(限流计数等)────────────────────────────────────────
print('\n[准备] 清理 Redis 缓存...')
django_eval('from django.core.cache import cache; cache.clear(); print("OK")')
# 删除上次可能残留的测试用户
PHONE = '13700000099'
PHONE_ALT = '13700000098' # U4 未注册自动注册专用
INST_CODE = 'SWAG_TEST_HOSP'
INST_NAME = 'Swagger测试医院'
# 全库唯一科室名,避免 resolve_department("儿科") 命中多条 → CASE_DEPARTMENT_AMBIGUOUS
DEPT_NAME = 'Swagger儿科'
# C3 手工兜底时的检查项(C1 有解析结果时优先用 AI 的 exam_items
SAMPLE_EXAM_ITEMS = [
{
'item_code': 'blood_routine',
'item_name': '血常规',
'item_type': 'lab',
'category': '实验室检查',
'result_text': 'WBC 10×10^9/L',
'result_structured': {'wbc': '10×10^9/L'},
'is_key': True,
'is_abnormal': False,
'score_weight': 1.0,
'display_order': 1,
},
{
'item_code': 'crp',
'item_name': 'CRP',
'item_type': 'lab',
'category': '实验室检查',
'result_text': 'CRP 8 mg/L',
'is_key': False,
'is_abnormal': False,
'score_weight': 1.0,
'display_order': 2,
},
]
django_eval(
f'from apps.user.models import User; '
f'User.objects.filter(phone__in=["{PHONE}","{PHONE_ALT}"]).delete(); print("cleaned")'
)
print('[准备] 完成\n')
s = requests.Session()
# U2 管理员代注册默认密码:Pass + 手机号
PASSWORD = f'Pass{PHONE}'
def _institution_fields():
return {'institution_code': INST_CODE, 'institution_name': INST_NAME}
# ═══════════════════════════════════════════════════════════════════════════════
section('用户端接口 (U1-U10)')
# ═══════════════════════════════════════════════════════════════════════════════
access = ''
refresh = ''
auth = {}
# U1: 发送验证码(login 场景,未注册用户也可发码)
u1_body = {'phone': PHONE, 'scene': 'login'}
r = s.post(f'{BASE}/api/user/auth/send-code/', json=u1_body)
log('U1', 'POST', '/api/user/auth/send-code/', 200, r.status_code,
req_headers=r.request.headers, req_body=u1_body,
resp_headers=dict(r.headers), resp_body=r.json())
# U2: 管理员代注册(无需验证码,默认密码 Pass+手机号)
u2_body = {
'phone': PHONE,
'real_name': 'Swagger测试',
'role_type': 'student',
**_institution_fields(),
}
r = s.post(f'{BASE}/api/user/auth/register/', json=u2_body)
log('U2', 'POST', '/api/user/auth/register/', 201, r.status_code,
req_headers=r.request.headers, req_body=u2_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 201:
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
# U3: 密码登录(Pass+手机号)
u3_body = {'phone': PHONE, 'password': PASSWORD}
r = s.post(f'{BASE}/api/user/auth/login/', json=u3_body)
log('U3', 'POST', '/api/user/auth/login/', 200, r.status_code,
req_headers=r.request.headers, req_body=u3_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U4: 验证码登录(已注册用户,需机构信息)
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE, 'scene': 'login'})
login_code = get_sms_code(PHONE, 'login')
if not login_code:
login_code = inject_sms_code(PHONE, 'login')
u4_body = {'phone': PHONE, 'code': login_code, **_institution_fields()}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_body)
log('U4', 'POST', '/api/user/auth/login-code/', 200, r.status_code,
req_headers=r.request.headers, req_body=u4_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code in (200, 201):
tokens = r.json().get('tokens', {})
access = tokens.get('access', access)
refresh = tokens.get('refresh', refresh)
auth = {'Authorization': f'Bearer {access}'}
# U4-new: 验证码登录自动注册(新手机号 → 201)
django_eval(f'from apps.user.models import User; User.objects.filter(phone="{PHONE_ALT}").delete()')
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE_ALT, 'scene': 'login'})
log('U4-pre', 'POST', '/api/user/auth/send-code/ (alt)', 200, r.status_code,
req_body={'phone': PHONE_ALT, 'scene': 'login'})
alt_code = get_sms_code(PHONE_ALT, 'login') or inject_sms_code(PHONE_ALT, 'login')
u4_new_body = {'phone': PHONE_ALT, 'code': alt_code, **_institution_fields()}
r = s.post(f'{BASE}/api/user/auth/login-code/', json=u4_new_body)
u4_new_detail = ''
if r.status_code in (200, 201):
u4_new_detail = f'is_new_user={r.json().get("is_new_user")}'
log('U4-new', 'POST', '/api/user/auth/login-code/ (auto-register)', [200, 201], r.status_code,
u4_new_detail, req_body=u4_new_body, resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
django_eval(f'from apps.user.models import User; User.objects.filter(phone="{PHONE_ALT}").delete()')
# U5: 重置密码
NEW_PASSWORD = 'SwagNew1'
r = s.post(f'{BASE}/api/user/auth/send-code/', json={'phone': PHONE, 'scene': 'reset'})
reset_code = get_sms_code(PHONE, 'reset')
if not reset_code:
reset_code = inject_sms_code(PHONE, 'reset', '111111')
u5_body = {'phone': PHONE, 'code': reset_code, 'new_password': NEW_PASSWORD}
r = s.post(f'{BASE}/api/user/auth/reset-password/', json=u5_body)
log('U5', 'POST', '/api/user/auth/reset-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u5_body,
resp_headers=dict(r.headers), resp_body=r.json())
# reset-password 调用 invalidate_user_tokens(time()+1),必须等 1s 再登录
time.sleep(1.2)
# 用新密码重新登录
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': PHONE, 'password': NEW_PASSWORD})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U6: 修改密码
FINAL_PASSWORD = 'SwagFin1'
u6_body = {'old_password': NEW_PASSWORD, 'new_password': FINAL_PASSWORD}
r = s.post(f'{BASE}/api/user/users/change-password/', json=u6_body, headers=auth)
log('U6', 'POST', '/api/user/users/change-password/', 200, r.status_code,
req_headers=r.request.headers, req_body=u6_body,
resp_headers=dict(r.headers), resp_body=r.json())
# change-password 同样 invalidate_user_tokens(time()+1)
time.sleep(1.2)
# 用最终密码重新登录
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': PHONE, 'password': FINAL_PASSWORD})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
refresh = tokens.get('refresh', '')
auth = {'Authorization': f'Bearer {access}'}
# U8: 刷新 Token
u8_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/refresh/', json=u8_body)
log('U8', 'POST', '/api/user/auth/refresh/', 200, r.status_code,
req_headers=r.request.headers, req_body=u8_body,
resp_headers=dict(r.headers), resp_body=r.json())
if r.status_code == 200:
access = r.json().get('access', access)
auth = {'Authorization': f'Bearer {access}'}
# /me: GET /me
r = s.get(f'{BASE}/api/user/users/me/', headers=auth)
log('/me', 'GET', '/api/user/users/me/', 200, r.status_code,
f'phone={r.json().get("phone","")}' if r.status_code == 200 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
test_user_id = r.json().get('id') if r.status_code == 200 else None
# ── U9/U10: 用户列表 + 用户详情 ──────────────────────────────────────────────
# 创建 admin、teacher、student 用户 + 师生关系
ADMIN_PHONE = '13700000088'
TEACHER_PHONE = '13700000077'
STUDENT_PHONE = '13700000066'
ROLE_PWD = 'RoleTest1'
django_eval(
f'from apps.user.models import User, TeacherStudentRelation; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'admin = User.objects.create_user(username="{ADMIN_PHONE}", password="{ROLE_PWD}", '
f' phone="{ADMIN_PHONE}", real_name="Swagger管理员", role_type="super_admin", status=1); '
f'teacher = User.objects.create_user(username="{TEACHER_PHONE}", password="{ROLE_PWD}", '
f' phone="{TEACHER_PHONE}", real_name="Swagger教师", role_type="teacher", status=1); '
f'student = User.objects.create_user(username="{STUDENT_PHONE}", password="{ROLE_PWD}", '
f' phone="{STUDENT_PHONE}", real_name="Swagger学生", role_type="student", status=1); '
f'TeacherStudentRelation.objects.create(teacher=teacher, student=student, '
f' relation_type="指导", status=1); '
f'print(f"admin={{admin.id}} teacher={{teacher.id}} student={{student.id}}")'
)
# 管理员登录
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': ADMIN_PHONE, 'password': ROLE_PWD})
admin_tokens = r.json().get('tokens', {})
admin_auth = {'Authorization': f'Bearer {admin_tokens.get("access", "")}'}
admin_refresh = admin_tokens.get('refresh', '')
# U9: 管理员获取用户列表
r = s.get(f'{BASE}/api/user/users/', headers=admin_auth)
u9_detail = ''
if r.status_code == 200:
u9_data = r.json()
u9_results = u9_data.get('results', u9_data)
u9_detail = f'count={len(u9_results)}'
log('U9', 'GET', '/api/user/users/', 200, r.status_code, u9_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-b: 教师获取用户列表(仅名下学生)
r_teacher_login = s.post(f'{BASE}/api/user/auth/login/',
json={'phone': TEACHER_PHONE, 'password': ROLE_PWD})
teacher_tokens = r_teacher_login.json().get('tokens', {})
teacher_auth = {'Authorization': f'Bearer {teacher_tokens.get("access", "")}'}
r = s.get(f'{BASE}/api/user/users/', headers=teacher_auth)
u9b_detail = ''
if r.status_code == 200:
u9b_data = r.json()
u9b_results = u9b_data.get('results', u9b_data)
u9b_detail = f'count={len(u9b_results)}(should=1 student only)'
log('U9-b', 'GET', '/api/user/users/ (teacher)', 200, r.status_code, u9b_detail,
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U9-c: 普通用户(当前测试用户)获取列表 → 403
r = s.get(f'{BASE}/api/user/users/', headers=auth)
log('U9-c', 'GET', '/api/user/users/ (normal)', 403, r.status_code,
f'code={r.json().get("code","")}' if r.status_code == 403 else '',
req_headers=r.request.headers, resp_headers=dict(r.headers), resp_body=r.json())
# U10: 管理员查看学生详情
# 先获取 student id
student_id = django_eval(
f'from apps.user.models import User; '
f'u = User.objects.get(phone="{STUDENT_PHONE}"); print(u.id)'
)
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=admin_auth)
log('U10', 'GET', f'/api/user/users/{student_id}/', 200, r.status_code,
f'real_name={r.json().get("real_name","")}' if r.status_code == 200 else '',
resp_body=r.json())
# U10-b: 教师查看名下学生详情
r = s.get(f'{BASE}/api/user/users/{student_id}/', headers=teacher_auth)
log('U10-b', 'GET', f'/api/user/users/{student_id}/ (teacher)', 200, r.status_code,
resp_body=r.json())
# U10-c: 普通用户查看自己详情
if test_user_id:
r = s.get(f'{BASE}/api/user/users/{test_user_id}/', headers=auth)
log('U10-c', 'GET', f'/api/user/users/{test_user_id}/ (self)', 200, r.status_code,
resp_body=r.json())
# 清理辅助用户
django_eval(
f'from apps.user.models import User; '
f'User.objects.filter(phone__in=["{ADMIN_PHONE}","{TEACHER_PHONE}","{STUDENT_PHONE}"]).delete(); '
f'print("cleaned")'
)
# U7: 退出登录 (logout) — 放最后,因为会吊销 refresh
u7_body = {'refresh': refresh}
r = s.post(f'{BASE}/api/user/auth/logout/', json=u7_body, headers=auth)
log('U7', 'POST', '/api/user/auth/logout/', 200, r.status_code,
req_body=u7_body, resp_body=r.json())
# ═══════════════════════════════════════════════════════════════════════════════
section('病例端接口 (C1-C5)')
# ═══════════════════════════════════════════════════════════════════════════════
# 重新登录(logout 吊销了上一个 refresh
time.sleep(1.2)
r = s.post(f'{BASE}/api/user/auth/login/', json={'phone': PHONE, 'password': FINAL_PASSWORD})
tokens = r.json().get('tokens', {})
access = tokens.get('access', '')
auth = {'Authorization': f'Bearer {access}'}
# 确保机构与科室存在(与 U2/U4 同一 institution_code
django_eval(
f'from apps.user.models import Institution, Department; '
f'inst, _ = Institution.objects.get_or_create(code="{INST_CODE}", '
f' defaults={{"name":"{INST_NAME}","type":"hospital","province":"北京","city":"北京"}}); '
f'Department.objects.get_or_create(name="{DEPT_NAME}", institution=inst, '
f' defaults={{"category":"临床"}}); '
f'print("OK")'
)
# C1: PDF 解析 — 使用项目真实 PDF 文件
REAL_PDF = r'D:\01Agent\medical_training\儿科 病例样例(SOAP+循证).pdf'
with open(REAL_PDF, 'rb') as f:
r = s.post(
f'{BASE}/api/case/cases/parse-pdf/',
files={'files': ('儿科 病例样例(SOAP+循证).pdf', f, 'application/pdf')},
data={'case_type': 'traditional'},
headers=auth,
)
c1_ok = [200, 500, 429] # 200=AI 解析成功, 500=AI 异常, 429=限流
detail = ''
c1_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
body = r.json()
c1_resp = body
detail = f'code={body.get("code", "")}'
if r.status_code == 200:
detail = f'parse_id={body.get("parse_id","")}, keys={list(body.get("data",{}).keys())[:5]}'
log('C1', 'POST', '/api/case/cases/parse-pdf/', c1_ok, r.status_code, detail,
req_body={'case_type': 'traditional', 'files': '<PDF file>'}, resp_body=c1_resp)
# C2: 生成评分规则 — 如果 C1 成功则用其返回的 data,否则用手工载荷
c1_data = None
if r.status_code == 200:
c1_data = body.get('data', {})
if c1_data:
c1_exam = c1_data.get('exam_items') or []
_write_log(
f' [INFO] C1 解析 exam_items: count={len(c1_exam)}, '
f'codes={[x.get("item_code") for x in c1_exam if isinstance(x, dict)]}'
)
c2_payload = c1_data if c1_data else {
'title': 'Swagger测试病例',
'case_type': 'traditional',
'chief_complaint': '发热3天',
'description': '患儿男4岁发热3天就诊',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗',
},
}
r = s.post(f'{BASE}/api/case/cases/generate-scoring-rules/', json=c2_payload, headers=auth)
c2_ok = [200, 500, 429]
detail = ''
scoring_rules_from_ai = None
c2_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
c2_body = r.json()
c2_resp = c2_body
if r.status_code == 200:
scoring_rules_from_ai = c2_body.get('scoring_rules', [])
detail = f'generated={c2_body.get("generated","")}, rules={len(scoring_rules_from_ai)}'
else:
detail = f'code={c2_body.get("code", "")}'
log('C2', 'POST', '/api/case/cases/generate-scoring-rules/', c2_ok, r.status_code, detail,
req_body=c2_payload, resp_body=c2_resp)
# C3: full-create — 优先用 C1+C2 AI 结果,否则用手工载荷
if c1_data and scoring_rules_from_ai:
_write_log(' [INFO] C3 使用 C1 AI 解析 + C2 AI 评分规则组装载荷')
payload = {**c1_data}
payload['department_name'] = DEPT_NAME # C1 常返回「儿科」,库内重名会 400
payload['scoring_rules'] = scoring_rules_from_ai
else:
_write_log(' [INFO] C3 使用手工表单载荷(C1/C2 未全部成功)')
payload = {
'title': 'Swagger-Try-It-Out-病例',
'case_type': 'traditional',
'difficulty': 'medium',
'chief_complaint': '发热 3 天',
'description': '患儿,男,4 岁,因发热 3 天就诊。',
'patient_age': 4,
'patient_gender': 'male',
'department_name': DEPT_NAME,
'estimated_minutes': 30,
'osce_enabled': False,
'tags': '儿科,发热',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗,退热处理',
'guideline_reference': '《儿科学》第 9 版',
},
'scoring_rules': [
{
'dimension': '诊断准确性',
'score_weight': 0.6,
'ai_auto_score': True,
'scoring_standard': '准确判断上呼吸道感染',
},
{
'dimension': '治疗方案',
'score_weight': 0.4,
'ai_auto_score': True,
'scoring_standard': '治疗方案合理',
},
],
'exam_items': SAMPLE_EXAM_ITEMS,
}
payload_exam_items = payload.get('exam_items') or []
_write_log(
f' [INFO] C3 提交 exam_items: count={len(payload_exam_items)}, '
f'codes={[x.get("item_code") for x in payload_exam_items if isinstance(x, dict)]}'
)
r = s.post(f'{BASE}/api/case/cases/full-create/', json=payload, headers=auth)
c3_resp = r.json() if r.headers.get('content-type', '').startswith('application/json') else None
log('C3', 'POST', '/api/case/cases/full-create/', 201, r.status_code,
req_body=payload, resp_body=c3_resp)
case_id = None
expected_exam_count = len(payload_exam_items)
if r.status_code == 201:
case_id = r.json()['case']['id']
resp_exam = (c3_resp or {}).get('exam_items', [])
resp_exam_count = len(resp_exam)
log(
'C3-exam', 'CHECK', f'/api/case/cases/{case_id}/ (resp exam_items)',
expected_exam_count, resp_exam_count,
f'codes={[e.get("item_code") for e in resp_exam]}',
resp_body={'exam_items': resp_exam} if resp_exam else None,
)
db_exam_count = count_case_exam_items(case_id)
db_codes = list_case_exam_item_codes(case_id)
log(
'C3-db', 'CHECK', f'case_exam_item case_id={case_id}',
expected_exam_count, db_exam_count,
f'db_codes={db_codes}',
)
else:
_write_log(' SKIP C3-exam/C3-db — C3 未成功')
# C4: GET full
if case_id:
r = s.get(f'{BASE}/api/case/cases/{case_id}/full/', headers=auth)
c4_detail = ''
c4_resp = None
if r.headers.get('content-type', '').startswith('application/json'):
c4_resp = r.json()
c4_exam = c4_resp.get('exam_items', [])
c4_detail = f'exam_items={len(c4_exam)}(expect={expected_exam_count})'
log('C4', 'GET', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
c4_detail, resp_body=c4_resp)
if c4_resp is not None:
log(
'C4-exam', 'CHECK', f'/api/case/cases/{case_id}/full/ exam_items',
expected_exam_count, len(c4_resp.get('exam_items', [])),
f'codes={[e.get("item_code") for e in c4_resp.get("exam_items", [])]}',
)
else:
_write_log(' SKIP C4 — C3 未返回 case_id')
# C5: PATCH full
if case_id:
c5_body = {'title': 'Swagger-更新标题'}
r = s.patch(f'{BASE}/api/case/cases/{case_id}/full/', json=c5_body, headers=auth)
log('C5', 'PATCH', f'/api/case/cases/{case_id}/full/', 200, r.status_code,
req_body=c5_body,
resp_body=r.json() if r.headers.get('content-type', '').startswith('application/json') else None)
else:
_write_log(' SKIP C5 — C3 未返回 case_id')
# ═══════════════════════════════════════════════════════════════════════════════
section('汇总')
# ═══════════════════════════════════════════════════════════════════════════════
total = len(results)
passed = sum(1 for r in results if r[5] == PASS)
failed = sum(1 for r in results if r[5] == FAIL)
_write_log(f'\n 总计: {total} 个接口 | 通过: {passed} | 失败: {failed}')
if failed:
_write_log('\n 失败接口:')
for r in results:
if r[5] == FAIL:
_write_log(f' {r[0]} {r[1]} {r[2]} -- expect={r[3]}, got={r[4]}')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(1)
else:
_write_log('\n ALL PASSED - 全部接口 Swagger Try-it-out 验证通过!')
_write_log(f'\n 日志文件: {LOG_FILE}')
_log_fh.close()
sys.exit(0)