Files
medical_training/test/swagger_cms_case.py
T
2026-06-12 17:19:23 +08:00

516 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
CMS 病例库 + AI 病例生成 + 病例审核 — Swagger Try-it-out 等效脚本(真实 HTTP)。
覆盖 CSV 行 80~98(超级管理员/内容管理员/医院管理员 各角色拆分):
CMS-CASE-1~7、CMS-CASE-AI-1、CMS-AUDIT-1~3
- 详细日志:logs/test-swagger-cms-case-YYYY-MM-DD.log
- 样例 JSONlogs/cms-case-swagger-examples.json(供回填 docx/API - Sheet1.csv
- PDF/AI 需本地 Django + Redis + .env 中 DEEPSEEK_API_KEY;无 Key 时加 --skip-ai 跳过
运行:
.venv\\Scripts\\python.exe test/swagger_cms_case.py
.venv\\Scripts\\python.exe test/swagger_cms_case.py --update-csv
.venv\\Scripts\\python.exe test/swagger_cms_case.py --skip-ai --update-csv
"""
from __future__ import annotations
import argparse
import csv
import io
import json
import os
import subprocess
import sys
from datetime import datetime
from typing import Any
import requests
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
BASE = os.environ.get('SWAGGER_BASE', 'http://127.0.0.1:8000')
PYTHON = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.venv', 'Scripts', 'python.exe')
if not os.path.isfile(PYTHON):
PYTHON = sys.executable
CWD = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(CWD, 'logs')
CSV_PATH = os.path.join(CWD, 'docx', 'API - Sheet1.csv')
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, f'test-swagger-cms-case-{datetime.now():%Y-%m-%d}.log')
EXAMPLES_FILE = os.path.join(LOG_DIR, 'cms-case-swagger-examples.json')
SUPER_PHONE = '13700009001'
CONTENT_PHONE = '13700009002'
HOSPITAL_PHONE = '13700009004'
_fh = open(LOG_FILE, 'a', encoding='utf-8')
examples: dict[str, dict] = {}
results: list[tuple[str, str, int]] = []
def w(text=''):
line = str(text)
_fh.write(line + '\n')
_fh.flush()
print(line)
def django_eval(code: str) -> str:
pre = (
'import django,os;'
'os.environ.setdefault("DJANGO_SETTINGS_MODULE","config.settings");'
'django.setup();'
)
p = subprocess.run(
[PYTHON, '-c', pre + code],
capture_output=True, text=True, cwd=CWD, encoding='utf-8',
)
if p.returncode != 0:
w(f'[django_eval stderr] {p.stderr}')
return (p.stdout or '').strip()
def compact_json(obj: Any, max_len: int = 2800) -> str:
s = json.dumps(obj, ensure_ascii=False, separators=(',', ':'))
if len(s) > max_len:
return s[:max_len] + '...(truncated)'
return s
def case_payload(title: str, institution_id=None, with_exam=False):
p = {
'title': title,
'case_type': 'traditional',
'difficulty': 'medium',
'chief_complaint': '发热 3 天',
'description': '患儿,男,4 岁,因发热 3 天就诊。',
'patient_age': 4,
'patient_gender': 'male',
'department_name': '儿科',
'estimated_minutes': 30,
'osce_enabled': False,
'tags': '儿科,发热',
'traditional': {
'standard_diagnosis': '上呼吸道感染',
'standard_treatment': '对症治疗,退热处理',
'guideline_reference': '《儿科学》第 9 版',
},
'scoring_rules': [
{'dimension': '诊断准确性', 'score_weight': 0.5, 'ai_auto_score': True, 'scoring_standard': '能准确诊断'},
{'dimension': '医患沟通', 'score_weight': 0.5, 'ai_auto_score': False, 'scoring_standard': '沟通到位'},
],
}
if institution_id is not None:
p['institution_id'] = institution_id
if with_exam:
p['exam_items'] = [{
'item_code': 'blood_routine',
'item_name': '血常规',
'item_type': 'lab',
'category': '实验室检查',
'result_text': 'WBC 10×10^9/L',
'is_key': True,
'is_abnormal': False,
'score_weight': 1.0,
'display_order': 1,
}]
return p
def shrink_list_resp(resp: dict) -> dict:
"""列表响应保留 1 条 results 样例,便于 CSV。"""
if not isinstance(resp, dict):
return resp
out = dict(resp)
if isinstance(out.get('results'), list) and out['results']:
out['results'] = [out['results'][0]]
return out
def shrink_full_resp(resp: dict) -> dict:
"""完整病例响应保留关键字段。"""
if not isinstance(resp, dict):
return resp
c = resp.get('case') or {}
return {
'case': {k: c.get(k) for k in (
'id', 'title', 'case_type', 'institution', 'institution_name',
'department', 'department_name', 'publish_status', 'created_by_name', 'created_at',
)},
'traditional': resp.get('traditional'),
'scoring_rules': (resp.get('scoring_rules') or [])[:2],
'exam_items': (resp.get('exam_items') or [])[:1],
}
def shrink_ai_resp(resp: dict) -> dict:
if not isinstance(resp, dict):
return resp
data = resp.get('data') or {}
return {
'parse_id': resp.get('parse_id'),
'case_type': resp.get('case_type'),
'ai_usage': resp.get('ai_usage'),
'prompt_version': resp.get('prompt_version'),
'parsing_seconds': resp.get('parsing_seconds'),
'generating_seconds': resp.get('generating_seconds'),
'data': {k: data.get(k) for k in (
'title', 'case_type', 'chief_complaint', 'department_name', 'patient_age', 'patient_gender',
) if k in data},
}
def call(
csv_key: str,
name: str,
method: str,
path: str,
token: str | None = None,
json_body=None,
params=None,
files=None,
data=None,
expect=200,
shrink=None,
):
"""发起 HTTP 请求,记录日志与 CSV 样例。"""
headers = {'Authorization': f'Bearer {token}'} if token else {}
url = BASE + path
r = requests.request(
method, url, headers=headers, json=json_body, params=params,
files=files, data=data, timeout=120,
)
ct = r.headers.get('content-type', '')
is_json = ct.startswith('application/json')
resp_raw = r.json() if is_json else r.text
resp_store = resp_raw
if shrink == 'list':
resp_store = shrink_list_resp(resp_raw if isinstance(resp_raw, dict) else {})
elif shrink == 'full':
resp_store = shrink_full_resp(resp_raw if isinstance(resp_raw, dict) else {})
elif shrink == 'ai':
resp_store = shrink_ai_resp(resp_raw if isinstance(resp_raw, dict) else {})
elif shrink == 'create':
resp_store = shrink_full_resp(resp_raw if isinstance(resp_raw, dict) else {})
if json_body is not None:
req_example = json.dumps(json_body, ensure_ascii=False)
elif files is not None:
fn = list(files.values())[0][0] if files else 'file.pdf'
extra = f', case_type={data.get("case_type")}' if data else ''
req_example = f'multipart/form-data: files={fn}{extra}'
elif params:
req_example = '?' + '&'.join(f'{k}={v}' for k, v in params.items())
elif method == 'GET' and '{' in path:
req_example = f'路径参数 + 请求头 Authorization: Bearer <token>'
elif method in ('POST',) and json_body is None and files is None:
req_example = '路径参数 id + 请求头 Authorization(无 Body'
else:
req_example = '请求头 Authorization: Bearer <token>'
exp_list = expect if isinstance(expect, (list, tuple)) else [expect]
ok = 'PASS' if r.status_code in exp_list else 'FAIL'
results.append((csv_key, ok, r.status_code))
examples[csv_key] = {
'name': name,
'method': method,
'path': path.lstrip('/'),
'status': r.status_code,
'req': req_example,
'resp': resp_store,
}
w(f'\n[{ok}] {csv_key} {name} -> {method} {path} (status={r.status_code})')
w(f' 请求: {req_example}')
body_str = json.dumps(resp_store, ensure_ascii=False, indent=2) if is_json else str(resp_store)
if len(body_str) > 1800:
body_str = body_str[:1800] + f' ...(截断, 共{len(body_str)}字符)'
w(f' 响应: {body_str}')
return r
def cleanup():
django_eval(
'from apps.case.models import CaseBase; from apps.user.models import User; '
f'CaseBase.all_objects.filter(created_by__phone__in=["{SUPER_PHONE}","{CONTENT_PHONE}"]).hard_delete(); '
f'User.objects.filter(phone__in=["{SUPER_PHONE}","{CONTENT_PHONE}","{HOSPITAL_PHONE}"]).delete(); '
'from apps.user.models import Institution; '
'Institution.all_objects.filter(code__in=["SWG_CASE_A","SWG_CASE_B"]).hard_delete(); '
'print("cleaned")'
)
def setup_tokens():
w('\n[准备] 建临时机构/用户并签发 token ...')
setup = django_eval(
'from apps.user.models import User, Institution, Department; '
'from rest_framework_simplejwt.tokens import RefreshToken; '
'iA,_=Institution.objects.get_or_create(code="SWG_CASE_A", defaults={"name":"病例联调A院","type":"hospital"}); '
'iB,_=Institution.objects.get_or_create(code="SWG_CASE_B", defaults={"name":"病例联调B院","type":"hospital"}); '
'd1,_=Department.objects.get_or_create(name="儿科", defaults={"category":"临床"}); '
'd2,_=Department.objects.get_or_create(name="内科", defaults={"category":"临床"}); '
f'[User.objects.filter(phone=p).delete() for p in ["{SUPER_PHONE}","{CONTENT_PHONE}","{HOSPITAL_PHONE}"]]; '
f'su=User.objects.create_user(username="{SUPER_PHONE}",password=None,phone="{SUPER_PHONE}",'
f'real_name="病例超管",role_type="super_admin",institution=iA,status=1); '
f'cu=User.objects.create_user(username="{CONTENT_PHONE}",password=None,phone="{CONTENT_PHONE}",'
f'real_name="病例内容员",role_type="content_admin",institution=iA,status=1); '
f'hu=User.objects.create_user(username="{HOSPITAL_PHONE}",password=None,phone="{HOSPITAL_PHONE}",'
f'real_name="病例院管",role_type="hospital_admin",institution=iA,status=1); '
'print("|".join([str(RefreshToken.for_user(su).access_token),'
'str(RefreshToken.for_user(cu).access_token),str(RefreshToken.for_user(hu).access_token),'
'str(d1.id),str(d2.id),str(iA.id),str(iB.id)]))'
)
su_tok, cu_tok, hu_tok, d1_id, d2_id, iA_id, iB_id = setup.split('|')
w(f'[准备] 完成 instA={iA_id} instB={iB_id} dept儿科={d1_id} dept内科={d2_id}')
return su_tok, cu_tok, hu_tok, int(d1_id), int(d2_id), int(iA_id), int(iB_id)
def run_tests(skip_ai: bool):
su_tok, cu_tok, hu_tok, d1_id, d2_id, iA_id, iB_id = setup_tokens()
w('\n' + '=' * 90)
w(' 超级管理员 - 病例库')
w('=' * 90)
# 先建病例供后续列表/查看
body_super = case_payload('Swagger超管病例(联调)', with_exam=True)
r = call('82', 'CMS-CASE-3 表单新增(超管)', 'POST', '/api/cms/cases/',
su_tok, json_body=body_super, expect=201, shrink='create')
super_case_id = (r.json().get('case') or {}).get('id')
r_b = call('_b', 'CMS-CASE-3 指定B院(流程用)', 'POST', '/api/cms/cases/',
su_tok, json_body=case_payload('Swagger B院病例', institution_id=iB_id),
expect=201, shrink='create')
b_case_id = (r_b.json().get('case') or {}).get('id')
call('80', 'CMS-CASE-1 病例列表(超管)', 'GET', '/api/cms/cases/',
su_tok, params={'page': 1}, expect=200, shrink='list')
pdf_bytes = (
b'%PDF-1.4\n1 0 obj<</Type/Catalog/Pages 2 0 R>>endobj\n'
b'2 0 obj<</Type/Pages/Kids[3 0 R]/Count 1>>endobj\n'
b'3 0 obj<</Type/Page/MediaBox[0 0 612 792]/Parent 2 0 R/Contents 4 0 R>>endobj\n'
b'4 0 obj<</Length 44>>stream\nBT /F1 12 Tf 100 700 Td (fever 3d) Tj ET\nendstream\nendobj\n'
b'xref\n0 5\ntrailer<</Root 1 0 R>>\nstartxref\n0\n%%EOF'
)
if not skip_ai:
w('\n[AI] PDF 导入 / AI 生成(需 DEEPSEEK_API_KEY...')
call('81', 'CMS-CASE-2 PDF导入(超管)', 'POST', '/api/cms/cases/import-pdf/',
su_tok, files={'files': ('case.pdf', pdf_bytes, 'application/pdf')},
data={'case_type': 'traditional'}, expect=[200, 429, 502, 504], shrink='ai')
call('87', 'CMS-CASE-AI-1 AI生成(超管)', 'POST', '/api/cms/cases/ai-generate/',
su_tok, json_body={
'case_type': 'traditional',
'prompt': '请生成一个儿科急性上呼吸道感染传统病例,患儿4岁男孩,发热咳嗽3天。',
}, expect=[200, 429, 502, 504], shrink='ai')
else:
w('\n[跳过] PDF/AI 接口(--skip-ai')
call('83', 'CMS-CASE-4 编辑关联(超管)', 'POST', f'/api/cms/cases/{super_case_id}/relations/',
su_tok, json_body={'institution_id': iB_id, 'department_id': d2_id}, expect=200)
# 单独草稿用于 submit
r = call('84', 'CMS-CASE-5 提交(超管)', 'POST', f'/api/cms/cases/{super_case_id}/submit/',
su_tok, expect=200)
# 再建一条用于 disable
r2 = call('85', 'CMS-CASE-6 停用(超管)', 'POST', f'/api/cms/cases/{b_case_id}/disable/',
su_tok, expect=200)
call('86', 'CMS-CASE-7 查看(超管)', 'GET', f'/api/cms/cases/{super_case_id}/full/',
su_tok, expect=200, shrink='full')
w('\n' + '=' * 90)
w(' 内容管理员 - 病例库')
w('=' * 90)
body_content = case_payload('Swagger内容员病例(联调)')
r = call('90', 'CMS-CASE-3 表单新增(内容员)', 'POST', '/api/cms/cases/',
cu_tok, json_body=body_content, expect=201, shrink='create')
content_case_id = (r.json().get('case') or {}).get('id')
call('88', 'CMS-CASE-1 病例列表(内容员)', 'GET', '/api/cms/cases/',
cu_tok, params={'page': 1}, expect=200, shrink='list')
if not skip_ai:
call('89', 'CMS-CASE-2 PDF导入(内容员)', 'POST', '/api/cms/cases/import-pdf/',
cu_tok, files={'files': ('case.pdf', pdf_bytes, 'application/pdf')},
data={'case_type': 'traditional'}, expect=[200, 429, 502, 504], shrink='ai')
call('95', 'CMS-CASE-AI-1 AI生成(内容员)', 'POST', '/api/cms/cases/ai-generate/',
cu_tok, json_body={
'case_type': 'traditional',
'prompt': '请生成一个儿科发热传统病例,4岁男孩,发热3天。',
}, expect=[200, 429, 502, 504], shrink='ai')
else:
w('\n[跳过] 内容员 PDF/AI')
call('91', 'CMS-CASE-4 编辑关联(内容员)', 'POST', f'/api/cms/cases/{content_case_id}/relations/',
cu_tok, json_body={'department_id': d2_id}, expect=200)
call('92', 'CMS-CASE-5 提交(内容员)', 'POST', f'/api/cms/cases/{content_case_id}/submit/',
cu_tok, expect=200)
r_dis = call('_dis', '待停用(流程用)', 'POST', '/api/cms/cases/',
cu_tok, json_body=case_payload('Swagger待停用病例'), expect=201, shrink='create')
disable_id = (r_dis.json().get('case') or {}).get('id')
call('93', 'CMS-CASE-6 停用(内容员)', 'POST', f'/api/cms/cases/{disable_id}/disable/',
cu_tok, expect=200)
call('94', 'CMS-CASE-7 查看(内容员)', 'GET', f'/api/cms/cases/{content_case_id}/full/',
cu_tok, expect=200, shrink='full')
w('\n' + '=' * 90)
w(' 医院管理员 - 病例审核')
w('=' * 90)
call('96', 'CMS-AUDIT-1 待审核列表', 'GET', '/api/cms/cases/',
hu_tok, params={'publish_status': 1, 'page': 1}, expect=200, shrink='list')
call('97', 'CMS-AUDIT-2 查看病例', 'GET', f'/api/cms/cases/{content_case_id}/full/',
hu_tok, expect=200, shrink='full')
call('98', 'CMS-AUDIT-3 发布', 'POST', f'/api/cms/cases/{content_case_id}/publish/',
hu_tok, expect=200)
# 各行「说明」原文(CSV 第 9 列,update 时保持不变)
ROW_NOTES = {
'80': '仅超级管理员。返回全平台病例(可用?institution=按机构过滤)。已软删(下架)不返回。不可发布(归医院管理员 CMS-AUDIT-3)。未登录401;非超管403',
'81': '仅超级管理员。解析预览不落库,前端审核后调 CMS-CASE-3 入库时可传 institution_id 指定任意机构。case_type非法→400;非multipart→415;限流429;AI异常502/504;非超管403',
'82': '仅超级管理员。落库为草稿(publish_status=0),可指定任意 institution_id。错误:CASE_TYPE_NOT_SUPPORTED/CASE_SUBTYPE_REQUIRED/CASE_SUBTYPE_CONFLICT/CASE_VALIDATION_ERROR/CASE_FIELD_NOT_ALLOWED/CASE_INSTITUTION_NOT_FOUND/CASE_DEPARTMENT_NOT_FOUND/CASE_EXAM_ITEM_DUPLICATE;非超管403',
'83': '仅超级管理员。可改全平台任意病例的所属机构/科室,不改病例内容。三者都没传→400 CASE_VALIDATION_ERROR;机构/科室不存在→400;病例不存在/已软删→404;非超管403',
'84': '仅超级管理员。全平台任意病例:草稿(0)→正常(1),进入对应机构医院管理员审核队列。非草稿→400 CASE_NOT_SUBMITTABLE;病例不存在/已软删→404;非超管403',
'85': '仅超级管理员。全平台任意病例停用=下架=软删除(is_deleted=1,不物删)。下架后默认列表/查看不返回;病例不存在/已软删→404;非超管403',
'86': '仅超级管理员。可查看全平台任意病例完整结构。病例不存在/已软删→404;非超管403',
'87': '仅超级管理员。DeepSeek按病例模板(prompts/case_{type}_full.md)生成,不落库;前端审核后走 CMS-CASE-3 入库时可指定 institution_id。prompt空→400;限流429;AI异常500/502/504;非超管403',
'88': '仅内容管理员。仅返回本院(institution=登录用户所属机构)病例;他院不可见。?institution 参数无效(后端强制本院)。已软删不返回。不可审核发布(归医院管理员)。未登录401;非内容管理员403',
'89': '仅内容管理员。解析预览不落库,前端审核后调 CMS-CASE-3 入库时 institution 强制本院(忽略 institution_id)。case_type非法→400;非multipart→415;限流429;AI异常502/504;非内容管理员403',
'90': '仅内容管理员。新建强制 institution=本院(传 institution_id 亦忽略)。落库为草稿(publish_status=0)。错误同 CMS-CASE-3;非内容管理员403',
'91': '仅内容管理员。仅能操作本院病例;可改科室,机构锁定本院(传 institution_id 无效或不可改他院)。不改病例内容。他院病例→404;非内容管理员403',
'92': '仅内容管理员。仅本院病例:草稿(0)→正常(1),进入本院医院管理员审核队列。非草稿→400 CASE_NOT_SUBMITTABLE;他院/不存在/已软删→404;非内容管理员403',
'93': '仅内容管理员。仅本院病例停用=下架=软删除(is_deleted=1)。下架后默认列表/查看不返回;他院/不存在/已软删→404;非内容管理员403',
'94': '仅内容管理员。仅可查看本院病例完整结构。他院/不存在/已软删→404;非内容管理员403',
'95': '仅内容管理员。DeepSeek按病例模板生成,不落库;前端审核后走 CMS-CASE-3 入库时强制落本院。prompt空→400;限流429;AI异常500/502/504;非内容管理员403',
'96': '仅医院管理员。本院 publish_status=1(正常/待审核)病例列表,即 CMS-CASE-1 加 publish_status=1 且后端强制本院。超管/内容管理员请用各自病例列表接口。未登录401;非医院管理员403',
'97': '仅医院管理员。审核前查看本院待审核病例完整内容。他院/不存在/已软删→404;非医院管理员403',
'98': '仅医院管理员(超管/内容管理员均403)。本院病例:正常(1)→已发布(2),发布后对本院移动端医学生可见。非正常→400 CASE_NOT_PUBLISHABLE;他院/不存在/已软删→404',
}
# 各行 params 字段说明前缀(不含「实际示例」)
ROW_PARAMS_BASE = {
'80': '查询参数(均可选)search(标题/主诉/标签/ICD)、case_type、publish_status(0草稿/1正常/2已发布)、institution(机构ID,可按任意机构过滤)、department(科室ID)、status、osce_enabled、ordering、page;请求头:Authorization',
'81': '请求体(multipart/form-data)files=1~5份.pdf、case_type=traditional|teaching;请求头:Authorization',
'82': '请求体(JSON)title*、case_type*(traditional|teaching)、institution_id(可选,指定落库机构;缺省落创建者机构)、department_name、子表traditional|teaching*、scoring_rules*(≥1dimension必填、score_weight∈(0,1])、exam_items(可选,item_code不重复)、difficulty/chief_complaint/description/patient_age/patient_gender/tags/icd_codes/estimated_minutes/osce_enabled…;不接收stages;请求头:Authorization',
'83': '路径参数:id;请求体(JSON,至少一项)institution_id(机构IDnull清空,可改任意机构)、department_id(科室IDnull清空) 或 department_name;请求头:Authorization',
'84': '路径参数:id(无Body);请求头:Authorization',
'85': '路径参数:id(无Body);请求头:Authorization',
'86': '路径参数:id;请求头:Authorization',
'87': '请求体(JSON)prompt*(病例长描述)、case_type*(traditional|teaching);请求头:Authorization',
'88': '查询参数(均可选)search、case_type、publish_status(0草稿/1正常/2已发布)、department(科室ID)、status、osce_enabled、ordering、page;请求头:Authorization',
'89': '请求体(multipart/form-data)files=1~5份.pdf、case_type=traditional|teaching;请求头:Authorization',
'90': '请求体(JSON)title*、case_type*(traditional|teaching)、department_name、子表traditional|teaching*、scoring_rules*(≥1dimension必填、score_weight∈(0,1])、exam_items(可选,item_code不重复)、difficulty/chief_complaint/description/patient_age/patient_gender/tags/icd_codes/estimated_minutes/osce_enabled…;不接收institution_id/stages;请求头:Authorization',
'91': '路径参数:id;请求体(JSON,至少一项)department_id(科室IDnull清空) 或 department_name;请求头:Authorization',
'92': '路径参数:id(无Body);请求头:Authorization',
'93': '路径参数:id(无Body);请求头:Authorization',
'94': '路径参数:id;请求头:Authorization',
'95': '请求体(JSON)prompt*(病例长描述)、case_type*(traditional|teaching);请求头:Authorization',
'96': '查询参数:publish_status=1(正常=待审核)、search、department、page;请求头:Authorization',
'97': '路径参数:id;请求头:Authorization',
'98': '路径参数:id(无Body);请求头:Authorization',
}
def update_csv():
"""用 examples 回填 CSV 第 80~98 行的 params / response 列。"""
if not os.path.isfile(EXAMPLES_FILE):
w(f'样例文件不存在: {EXAMPLES_FILE}')
return False
with open(EXAMPLES_FILE, encoding='utf-8') as f:
ex = json.load(f)
rows = []
with open(CSV_PATH, encoding='utf-8', newline='') as f:
rows = list(csv.reader(f))
updated = 0
for i, row in enumerate(rows):
if not row or row[0] not in ex:
continue
row_num = row[0]
e = ex[row_num]
method = e.get('method', row[6] if len(row) > 6 else 'GET')
base_params = ROW_PARAMS_BASE.get(row_num, row[7] if len(row) > 7 else '')
req = e.get('req', '')
if method == 'GET' and req.startswith('?'):
new_params = f'{base_params} 实际示例:{method} /{e["path"]}{req}'
elif req.startswith('multipart'):
new_params = f'{base_params} 实际示例:{req}'
elif req.startswith('{') or req.startswith('['):
new_params = f'{base_params} 实际示例:{req}'
elif '路径参数' in req or '无 Body' in req or 'Bearer' in req:
new_params = f'{base_params} 实际示例:{method} /{e["path"]}'
else:
new_params = f'{base_params} 实际示例:{req}'
new_resp = f'实际返回:{compact_json(e.get("resp", ""))}'
note = ROW_NOTES.get(row_num, row[9] if len(row) > 9 else '')
rows[i] = row[:7] + [new_params, new_resp, note, '1', row[11] if len(row) > 11 else '0']
updated += 1
w(f' CSV 行 #{row_num} 已更新 ({e.get("name")})')
with open(CSV_PATH, 'w', encoding='utf-8', newline='') as f:
csv.writer(f, quoting=csv.QUOTE_MINIMAL).writerows(rows)
w(f'\nCSV 更新完成:{updated} 行 -> {CSV_PATH}')
return updated > 0
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--skip-ai', action='store_true', help='跳过 PDF/AI(无 DeepSeek Key 时)')
parser.add_argument('--update-csv', action='store_true', help='测试后回填 docx/API - Sheet1.csv')
parser.add_argument('--csv-only', action='store_true', help='仅从已有 examples JSON 更新 CSV')
args = parser.parse_args()
w('=' * 90)
w(f' CMS 病例 Swagger 测试 {datetime.now():%Y-%m-%d %H:%M:%S}')
w(f' BASE={BASE}')
w('=' * 90)
if args.csv_only:
update_csv()
_fh.close()
return 0
try:
run_tests(skip_ai=args.skip_ai)
finally:
w('\n[清理] 删除测试病例与用户 ...')
cleanup()
with open(EXAMPLES_FILE, 'w', encoding='utf-8') as f:
json.dump(examples, f, ensure_ascii=False, indent=2)
w('\n' + '=' * 90)
total = len(results)
passed = sum(1 for _, ok, _ in results if ok == 'PASS')
w(f' 总计 {total} | 通过 {passed} | 失败 {total - passed}')
fails = [(c, s) for c, ok, s in results if ok == 'FAIL']
if fails:
w(' 失败: ' + ', '.join(f'{c}({s})' for c, s in fails))
w(f' 日志: {LOG_FILE}')
w(f' 样例: {EXAMPLES_FILE}')
if args.update_csv:
update_csv()
_fh.close()
return 0 if not fails else 1
if __name__ == '__main__':
raise SystemExit(main())