diff --git a/apps/case/cms.py b/apps/case/cms.py new file mode 100644 index 0000000..f1e9920 --- /dev/null +++ b/apps/case/cms.py @@ -0,0 +1,229 @@ +"""CMS 病例库 + AI 病例生成 + 病例审核。 + +挂载于 `/api/cms/cases/`,仅 GET/POST(CMS v0.6 约定:查=GET,增删改=POST)。三类角色共用: +- **超级管理员**:全平台病例,建/改/导入/AI生成/编辑关联/提交/停用;**不做病例审核发布**。 +- **内容管理员**:仅本机构(`institution`)病例,可建/导入/AI生成/编辑关联/提交/停用;不审核发布。 +- **医院管理员**:仅本机构病例,做**病例审核**——查看 + 发布(正常 → 已发布);不建/改。 + +按设计文档实现(已为 `case_base` 加 `institution` 外键、`is_deleted` 软删除): +- 数据范围以 `institution` 收口(内容/医院管理员仅本院)。 +- 病例状态机 `publish_status`:0 草稿 →[提交]→ 1 正常 →[医院管理员发布]→ 2 已发布。 +- 停用/下架 = **软删除**(`is_deleted=1`,默认管理器自动过滤);编辑关联可改所属**机构 + 科室**。 +""" +import logging + +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse, inline_serializer +from rest_framework import viewsets, filters, status, serializers as drf_serializers +from rest_framework.decorators import action +from rest_framework.parsers import MultiPartParser +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from config.exceptions import AppError +from apps.cms.permissions import ( + IsSuperContentOrHospitalAdmin, is_super, is_content_admin, is_hospital_admin, +) +from apps.user.models import Institution, Department +from apps.user.throttling import PdfParseUserThrottle +from .models import CaseBase +from .serializers import CaseBaseListSerializer +from .services import case_importer +from .services.case_writer import create_case_full, build_full_response + +audit = logging.getLogger('audit') + + +@extend_schema_view( + list=extend_schema(summary='CMS-CASE-1 病例列表', tags=['CMS-病例']), + create=extend_schema(summary='CMS-CASE-3 表单新增病例(草稿)', tags=['CMS-病例']), +) +class CmsCaseViewSet(viewsets.ModelViewSet): + """CMS 病例库 + 审核。超管全平台、内容/医院管理员仅本机构(`institution`)。""" + serializer_class = CaseBaseListSerializer + permission_classes = [IsAuthenticated, IsSuperContentOrHospitalAdmin] + filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] + filterset_fields = ['case_type', 'publish_status', 'department', 'institution', 'status', 'osce_enabled'] + search_fields = ['title', 'chief_complaint', 'tags', 'icd_codes'] + ordering_fields = ['created_at', 'difficulty_score', 'estimated_minutes'] + # 仅 GET / POST:查=GET,增删改=POST(编辑关联→{id}/relations/,停用→{id}/disable/,发布→{id}/publish/) + http_method_names = ['get', 'post', 'head', 'options'] + + def get_queryset(self): + # 默认管理器已过滤软删(is_deleted=False) + qs = CaseBase.objects.select_related('institution', 'department', 'created_by').all().order_by('-created_at') + user = self.request.user + if is_super(user): + return qs + # 内容管理员 / 医院管理员:仅本机构 + return qs.filter(institution_id=user.institution_id) + + # ── 角色守卫 ───────────────────────────────────────────────────────── + def _require_editor(self): + """建/改/导入/AI生成/提交/停用:仅超管或内容管理员。""" + if not (is_super(self.request.user) or is_content_admin(self.request.user)): + raise AppError('CMS_PERMISSION_DENIED', '仅超级管理员 / 内容管理员可操作病例内容', status_code=403) + + def _require_publisher(self): + """审核发布:仅医院管理员(超级管理员不做病例审核)。""" + if not is_hospital_admin(self.request.user): + raise AppError('CMS_PERMISSION_DENIED', '病例审核发布仅医院管理员可操作', status_code=403) + + def _resolve_institution(self): + """新建病例的所属机构:内容管理员强制本院;超管可传 institution_id,缺省落本院。""" + user = self.request.user + if is_content_admin(user): + return user.institution + inst_id = self.request.data.get('institution_id') + if inst_id in (None, ''): + return user.institution + inst = Institution.objects.filter(id=inst_id).first() + if not inst: + raise AppError('CASE_INSTITUTION_NOT_FOUND', f'机构 id={inst_id} 不存在', status_code=400) + return inst + + # ── CMS-CASE-3 表单新增(草稿)/ CMS-CASE-1 列表 ────────────────────── + def create(self, request, *args, **kwargs): + self._require_editor() + case, n_rules, n_items = create_case_full(request.data, request.user, institution=self._resolve_institution()) + audit.info('CMS_CASE_CREATE case_id=%s by=%s inst=%s scoring_rules=%d exam_items=%d', + case.id, request.user.id, case.institution_id, n_rules, n_items) + return Response(build_full_response(case), status=status.HTTP_201_CREATED) + + # list 用默认实现(默认管理器已排除软删;医院管理员审核用 ?publish_status=1 取待审核) + + # ── CMS-CASE-7 / CMS-AUDIT-2 病例查看(完整结构)───────────────────── + @extend_schema(summary='CMS-CASE-7 病例查看(完整结构)', tags=['CMS-病例']) + @action(detail=True, methods=['get']) + def full(self, request, pk=None): + return Response(build_full_response(self.get_object())) + + # ── CMS-CASE-2 PDF 导入(解析预览,不落库)──────────────────────────── + @extend_schema( + summary='CMS-CASE-2 PDF 导入(解析预览)', + request={'multipart/form-data': {'type': 'object', 'properties': { + 'files': {'type': 'array', 'items': {'type': 'string', 'format': 'binary'}}, + 'case_type': {'type': 'string', 'enum': ['traditional', 'teaching']}, + }, 'required': ['files', 'case_type']}}, + responses={200: OpenApiResponse(description='解析结果(parse_id + data)')}, + tags=['CMS-病例'], + ) + @action(detail=False, methods=['post'], url_path='import-pdf', + parser_classes=[MultiPartParser], throttle_classes=[PdfParseUserThrottle]) + def import_pdf(self, request): + """解析 PDF → 结构化病例数据(不落库;前端审核后调 CMS-CASE-3 入库)。""" + self._require_editor() + files = request.FILES.getlist('files') + case_type = request.data.get('case_type', '') + return Response(case_importer.parse_pdf(files, case_type, request.user)) + + # ── CMS-CASE-AI-1 AI 生成病例内容(不落库)─────────────────────────── + @extend_schema( + summary='CMS-CASE-AI-1 AI 生成病例内容', + request=inline_serializer('CmsCaseAiGenerateRequest', fields={ + 'prompt': drf_serializers.CharField(help_text='病例长描述 prompt'), + 'case_type': drf_serializers.ChoiceField(choices=['traditional', 'teaching']), + }), + responses={200: OpenApiResponse(description='生成结果(parse_id + data)')}, + tags=['CMS-病例'], + ) + @action(detail=False, methods=['post'], url_path='ai-generate', + throttle_classes=[PdfParseUserThrottle]) + def ai_generate(self, request): + """一段长 prompt → DeepSeek 按病例模板生成内容(与 PDF 导入同构,不落库)。""" + self._require_editor() + prompt = request.data.get('prompt', '') + case_type = request.data.get('case_type', '') + return Response(case_importer.generate_from_prompt(prompt, case_type, request.user)) + + # ── CMS-CASE-4 编辑关联信息(改所属机构 / 科室)────────────────────── + @extend_schema( + summary='CMS-CASE-4 编辑关联信息(改所属机构 / 科室)', + request=inline_serializer('CmsCaseRelationsRequest', fields={ + 'institution_id': drf_serializers.IntegerField(required=False, allow_null=True), + 'department_id': drf_serializers.IntegerField(required=False, allow_null=True), + 'department_name': drf_serializers.CharField(required=False), + }), + tags=['CMS-病例'], + ) + @action(detail=True, methods=['post']) + def relations(self, request, pk=None): + """改病例「属于哪个机构 / 科室」,不改病例内容。""" + self._require_editor() + case = self.get_object() + data = request.data + changed = [] + if 'institution_id' in data: + inst_id = data.get('institution_id') + if inst_id in (None, ''): + case.institution = None + else: + inst = Institution.objects.filter(id=inst_id).first() + if not inst: + raise AppError('CASE_INSTITUTION_NOT_FOUND', f'机构 id={inst_id} 不存在', status_code=400) + case.institution = inst + changed.append('institution') + if 'department_id' in data: + dept_id = data.get('department_id') + if dept_id in (None, ''): + case.department = None + else: + dept = Department.objects.filter(id=dept_id).first() + if not dept: + raise AppError('CASE_DEPARTMENT_NOT_FOUND', f'科室 id={dept_id} 不存在', status_code=400) + case.department = dept + changed.append('department') + elif 'department_name' in data: + from .services.department_resolver import resolve_department + case.department = resolve_department(data.get('department_name', '')) + changed.append('department') + if not changed: + raise AppError('CASE_VALIDATION_ERROR', '请提供 institution_id 或 department_id/department_name', status_code=400) + case.save(update_fields=changed + ['updated_at']) + audit.info('CMS_CASE_RELATIONS case_id=%s by=%s inst=%s dept=%s', + case.id, request.user.id, case.institution_id, case.department_id) + return Response({ + 'message': '关联信息已更新', 'id': case.id, + 'institution': case.institution_id, + 'institution_name': case.institution.name if case.institution else None, + 'department': case.department_id, + 'department_name': case.department.name if case.department else None, + }) + + # ── CMS-CASE-5 提交(草稿 → 正常)──────────────────────────────────── + @extend_schema(summary='CMS-CASE-5 提交(草稿 → 正常)', tags=['CMS-病例']) + @action(detail=True, methods=['post']) + def submit(self, request, pk=None): + self._require_editor() + case = self.get_object() + if case.publish_status != 0: + raise AppError('CASE_NOT_SUBMITTABLE', '仅草稿可提交', status_code=400) + case.publish_status = 1 # 正常(待医院管理员审核发布) + case.save(update_fields=['publish_status', 'updated_at']) + audit.info('CMS_CASE_SUBMIT case_id=%s by=%s', case.id, request.user.id) + return Response({'message': '已提交', 'id': case.id, 'publish_status': case.publish_status}) + + # ── CMS-CASE-6 停用 / 重录(下架 = 软删除)────────────────────────── + @extend_schema(summary='CMS-CASE-6 停用 / 重录(下架)', tags=['CMS-病例']) + @action(detail=True, methods=['post']) + def disable(self, request, pk=None): + """停用 = 下架 = 软删除(is_deleted=1),不物理删除。""" + self._require_editor() + case = self.get_object() + case.delete() # SoftDeleteModel:置 is_deleted=True, deleted_at=now + audit.info('CMS_CASE_DISABLE case_id=%s by=%s', case.id, request.user.id) + return Response({'message': '已下架', 'id': case.id}) + + # ── CMS-AUDIT-3 发布(正常 → 已发布,仅医院管理员审核)─────────────── + @extend_schema(summary='CMS-AUDIT-3 发布(正常 → 已发布)', tags=['CMS-病例审核']) + @action(detail=True, methods=['post']) + def publish(self, request, pk=None): + """医院管理员审核发布:正常(1) → 已发布(2)。发布后对本院移动端医学生可见。超管不做审核。""" + self._require_publisher() + case = self.get_object() + if case.publish_status != 1: + raise AppError('CASE_NOT_PUBLISHABLE', '仅「正常」状态病例可发布', status_code=400) + case.publish_status = 2 # 已发布 + case.save(update_fields=['publish_status', 'updated_at']) + audit.info('CMS_CASE_PUBLISH case_id=%s by=%s', case.id, request.user.id) + return Response({'message': '已发布', 'id': case.id, 'publish_status': case.publish_status}) diff --git a/apps/case/cms_urls.py b/apps/case/cms_urls.py new file mode 100644 index 0000000..b261b30 --- /dev/null +++ b/apps/case/cms_urls.py @@ -0,0 +1,11 @@ +from django.urls import path, include +from rest_framework.routers import DefaultRouter + +from .cms import CmsCaseViewSet + +router = DefaultRouter() +router.register(r'cases', CmsCaseViewSet, basename='cms-case') + +urlpatterns = [ + path('', include(router.urls)), +] diff --git a/apps/case/migrations/0004_casebase_deleted_at_casebase_institution_and_more.py b/apps/case/migrations/0004_casebase_deleted_at_casebase_institution_and_more.py new file mode 100644 index 0000000..1bb62a0 --- /dev/null +++ b/apps/case/migrations/0004_casebase_deleted_at_casebase_institution_and_more.py @@ -0,0 +1,35 @@ +# Generated by Django 5.2.14 on 2026-06-12 07:57 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('case', '0003_case_exam_item'), + ('user', '0007_user_deleted_at_user_is_deleted'), + ] + + operations = [ + migrations.AddField( + model_name='casebase', + name='deleted_at', + field=models.DateTimeField(blank=True, null=True, verbose_name='删除时间'), + ), + migrations.AddField( + model_name='casebase', + name='institution', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='cases', to='user.institution', verbose_name='所属机构'), + ), + migrations.AddField( + model_name='casebase', + name='is_deleted', + field=models.BooleanField(db_index=True, default=False, verbose_name='是否删除'), + ), + migrations.AlterField( + model_name='casebase', + name='publish_status', + field=models.SmallIntegerField(choices=[(0, '草稿'), (1, '正常'), (2, '已发布')], default=0, verbose_name='发布状态'), + ), + ] diff --git a/apps/case/models.py b/apps/case/models.py index de40287..bfa211b 100644 --- a/apps/case/models.py +++ b/apps/case/models.py @@ -1,10 +1,10 @@ from django.db import models -from apps.common.models import BaseModel +from apps.common.models import BaseModel, SoftDeleteModel from apps.user.models import User -class CaseBase(BaseModel): - """病例主表""" +class CaseBase(SoftDeleteModel): + """病例主表(软删除:`is_deleted=1` 即「已下架」;`objects` 默认过滤已删)。""" CASE_TYPE_CHOICES = [ ('traditional', '传统病例'), ('script', '剧本病例'), @@ -15,10 +15,11 @@ class CaseBase(BaseModel): (0, '禁用'), (1, '正常'), ] + # 病例状态机:草稿 →[提交]→ 正常 →[医院管理员发布]→ 已发布;下架 = 软删除(is_deleted=1) PUBLISH_STATUS_CHOICES = [ (0, '草稿'), - (1, '已发布'), - (2, '已下架'), + (1, '正常'), + (2, '已发布'), ] id = models.BigAutoField(primary_key=True) @@ -26,6 +27,10 @@ class CaseBase(BaseModel): case_type = models.CharField('病例类型', max_length=30, choices=CASE_TYPE_CHOICES) difficulty = models.CharField('难度', max_length=20, blank=True) difficulty_score = models.IntegerField('AI难度评分', null=True, blank=True) + institution = models.ForeignKey( + 'user.Institution', on_delete=models.SET_NULL, + null=True, blank=True, related_name='cases', verbose_name='所属机构' + ) department = models.ForeignKey( 'user.Department', on_delete=models.SET_NULL, null=True, blank=True, verbose_name='所属科室' diff --git a/apps/case/serializers.py b/apps/case/serializers.py index 88fad70..41bf58d 100644 --- a/apps/case/serializers.py +++ b/apps/case/serializers.py @@ -6,6 +6,7 @@ from .models import ( class CaseBaseListSerializer(serializers.ModelSerializer): + institution_name = serializers.CharField(source='institution.name', read_only=True) department_name = serializers.CharField(source='department.name', read_only=True) created_by_name = serializers.CharField(source='created_by.real_name', read_only=True) @@ -13,7 +14,8 @@ class CaseBaseListSerializer(serializers.ModelSerializer): model = CaseBase fields = [ 'id', 'title', 'case_type', 'difficulty', 'difficulty_score', - 'department', 'department_name', 'chief_complaint', 'patient_age', + 'institution', 'institution_name', 'department', 'department_name', + 'chief_complaint', 'patient_age', 'patient_gender', 'tags', 'estimated_minutes', 'osce_enabled', 'publish_status', 'status', 'created_by_name', 'created_at', 'updated_at' ] diff --git a/apps/case/services/case_importer.py b/apps/case/services/case_importer.py index 6c4cf22..8cc7dd4 100644 --- a/apps/case/services/case_importer.py +++ b/apps/case/services/case_importer.py @@ -72,6 +72,54 @@ def parse_pdf(files, case_type: str, user) -> dict: } +def generate_from_prompt(prompt: str, case_type: str, user) -> dict: + """CMS-CASE-AI-1: 一段长 prompt → DeepSeek 按病例模板生成结构化病例内容。 + + 产出与 parse_pdf 同构(不落库、不含评分规则),缓存 parse_id 供后续 full-create 复用。 + """ + if case_type not in ('traditional', 'teaching'): + raise AppError('CASE_TYPE_NOT_SUPPORTED', f'case_type 不支持: {case_type}', status_code=400) + if not (prompt or '').strip(): + raise AppError('CASE_VALIDATION_ERROR', 'prompt 必填', status_code=400) + + t0 = time.time() + + prompt_name = f'case_{case_type}_full' + system_prompt, prompt_version = load_prompt(prompt_name) + + result = deepseek_client.call_deepseek(system_prompt, prompt) + data = result['data'] + + data.pop('scoring_rules', None) + data.pop('stages', None) + + data['case_type'] = case_type + if 'exam_items' in data: + data['exam_items'] = normalize_exam_items(data.get('exam_items') or []) + else: + data['exam_items'] = [] + + _strip_unknown_fields(data) + _validate_schema(data) + + parse_id = uuid.uuid4().hex[:12] + cache.set(f'parse_result:{parse_id}', json.dumps(data, ensure_ascii=False), PARSE_RESULT_TTL) + + audit.info( + 'CASE_AI_GENERATE user=%s parse_id=%s tokens=%s prompt_version=%s', + user.id, parse_id, result.get('usage', {}), prompt_version, + ) + + return { + 'parse_id': parse_id, + 'case_type': case_type, + 'ai_usage': result.get('usage', {}), + 'prompt_version': prompt_version, + 'generating_seconds': round(time.time() - t0, 1), + 'data': data, + } + + _SCHEMA_ALLOWED_KEYS = { 'title', 'case_type', 'difficulty', 'chief_complaint', 'description', 'patient_age', 'patient_gender', 'tags', 'symptom_tags', 'disease_tags', diff --git a/apps/case/services/case_writer.py b/apps/case/services/case_writer.py new file mode 100644 index 0000000..30777f5 --- /dev/null +++ b/apps/case/services/case_writer.py @@ -0,0 +1,216 @@ +"""病例落库 / 完整结构组装的共享逻辑。 + +`/api/case/cases/full-create`(业务域)与 `/api/cms/cases/`(CMS 内容管理)共用同一套 +创建与序列化逻辑,避免两处实现漂移。 +""" +from django.db import transaction + +from config.exceptions import AppError +from ..models import ( + CaseBase, TraditionalCase, TeachingCase, ScoringRule, CaseExamItem, +) +from .department_resolver import resolve_department +from .exam_items import ( + normalize_exam_items, assert_no_duplicate_exam_items, EXAM_ITEM_FIELDS, +) + +TRADITIONAL_FIELDS = {'standard_diagnosis', 'standard_treatment', 'guideline_reference'} +TEACHING_FIELDS = {'teaching_goal', 'discussion_questions', 'teacher_guide', 'scoring_focus'} +SCORING_RULE_FIELDS = { + 'dimension', 'competency_dimension', 'score_weight', + 'ai_auto_score', 'osce_dimension', 'scoring_standard', 'rubric_json', +} +CASE_BASE_FIELDS = { + 'title', 'case_type', 'difficulty', 'difficulty_score', + 'chief_complaint', 'description', 'patient_age', 'patient_gender', + 'tags', 'symptom_tags', 'disease_tags', 'competency_tags', + 'guideline_tags', 'knowledge_points', 'icd_codes', + 'estimated_minutes', 'osce_enabled', 'rag_enabled', + 'ai_prompt_template', 'multimodal_assets', +} + + +def validate_scoring_rules(rules): + if not isinstance(rules, list): + raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必须为数组', status_code=400) + for i, rule in enumerate(rules): + if not isinstance(rule, dict): + raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}] 必须为对象', status_code=400) + if not rule.get('dimension'): + raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].dimension 必填', status_code=400) + weight = rule.get('score_weight') + if weight is not None: + try: + weight = float(weight) + except (TypeError, ValueError): + raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须为数字', status_code=400) + if weight <= 0 or weight > 1: + raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须在 (0, 1]', status_code=400) + + +_UNSET = object() + + +def create_case_full(data, user, institution=_UNSET): + """主表 + 子表 + scoring_rules(≥1) + exam_items 同一事务入库。 + + institution:病例所属机构;默认取创建者所属机构(`user.institution`)。 + 返回 (case, scoring_rule_count, exam_item_count)。校验失败抛 AppError。 + """ + if institution is _UNSET: + institution = getattr(user, 'institution', None) + case_type = data.get('case_type', '') + if case_type not in ('traditional', 'teaching'): + raise AppError('CASE_TYPE_NOT_SUPPORTED', f'case_type 不支持: {case_type}', status_code=400) + + if 'stages' in data: + raise AppError('CASE_FIELD_NOT_ALLOWED', '本接口不接收 stages 字段', status_code=400) + + sub_data = data.get(case_type) + other_type = 'teaching' if case_type == 'traditional' else 'traditional' + if not sub_data: + raise AppError('CASE_SUBTYPE_REQUIRED', f'{case_type} 子表数据缺失', status_code=400) + if data.get(other_type): + raise AppError('CASE_SUBTYPE_CONFLICT', '不允许同时传入两种子表数据', status_code=400) + + scoring_rules_data = data.get('scoring_rules', []) + if not scoring_rules_data: + raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必填且至少 1 条', status_code=400) + validate_scoring_rules(scoring_rules_data) + + exam_items_data = normalize_exam_items(data.get('exam_items') or []) + assert_no_duplicate_exam_items(exam_items_data) + + department = resolve_department(data.get('department_name', '')) + + with transaction.atomic(): + case_kwargs = {k: data[k] for k in CASE_BASE_FIELDS if k in data} + case_kwargs['department'] = department + case_kwargs['institution'] = institution + case_kwargs['created_by'] = user + case_kwargs['status'] = 1 + case_kwargs['vector_status'] = 0 + case_kwargs['publish_status'] = 1 if data.get('auto_publish') else 0 + case = CaseBase.objects.create(**case_kwargs) + + sub_model = TraditionalCase if case_type == 'traditional' else TeachingCase + allowed_sub = TRADITIONAL_FIELDS if case_type == 'traditional' else TEACHING_FIELDS + sub_model.objects.create(case=case, **{k: v for k, v in sub_data.items() if k in allowed_sub}) + + rule_objs = [ + ScoringRule(case=case, **{k: v for k, v in rule.items() if k in SCORING_RULE_FIELDS}) + for rule in scoring_rules_data + ] + ScoringRule.objects.bulk_create(rule_objs) + + if exam_items_data: + exam_objs = [ + CaseExamItem(case=case, **{k: v for k, v in item.items() if k in EXAM_ITEM_FIELDS}) + for item in exam_items_data + ] + CaseExamItem.objects.bulk_create(exam_objs) + + return case, len(rule_objs), len(exam_items_data) + + +def build_full_response(case): + """组装病例完整结构(主表 + 子表 + scoring_rules + exam_items)。""" + if hasattr(case, '_prefetched_objects_cache') and 'exam_items' in getattr( + case, '_prefetched_objects_cache', {} + ): + exam_qs = case.exam_items.all() + else: + exam_qs = CaseExamItem.objects.filter(case_id=case.id).order_by('display_order', 'id') + + result = { + 'case': { + 'id': case.id, + 'title': case.title, + 'case_type': case.case_type, + 'difficulty': case.difficulty, + 'difficulty_score': case.difficulty_score, + 'institution': case.institution_id, + 'institution_name': case.institution.name if case.institution else None, + 'department': case.department_id, + 'department_name': case.department.name if case.department else None, + 'chief_complaint': case.chief_complaint, + 'description': case.description, + 'patient_age': case.patient_age, + 'patient_gender': case.patient_gender, + 'tags': case.tags, + 'symptom_tags': case.symptom_tags, + 'disease_tags': case.disease_tags, + 'competency_tags': case.competency_tags, + 'guideline_tags': case.guideline_tags, + 'knowledge_points': case.knowledge_points, + 'icd_codes': case.icd_codes, + 'estimated_minutes': case.estimated_minutes, + 'osce_enabled': case.osce_enabled, + 'rag_enabled': case.rag_enabled, + 'ai_prompt_template': case.ai_prompt_template, + 'multimodal_assets': case.multimodal_assets, + 'vector_status': case.vector_status, + 'publish_status': case.publish_status, + 'status': case.status, + 'created_by': case.created_by_id, + 'created_by_name': case.created_by.real_name if case.created_by else None, + 'created_at': case.created_at.isoformat() if case.created_at else None, + 'updated_at': case.updated_at.isoformat() if case.updated_at else None, + }, + } + + if case.case_type == 'traditional': + try: + tc = case.traditionalcase + result['traditional'] = { + 'standard_diagnosis': tc.standard_diagnosis, + 'standard_treatment': tc.standard_treatment, + 'guideline_reference': tc.guideline_reference, + } + except TraditionalCase.DoesNotExist: + result['traditional'] = None + elif case.case_type == 'teaching': + try: + tc = case.teachingcase + result['teaching'] = { + 'teaching_goal': tc.teaching_goal, + 'discussion_questions': tc.discussion_questions, + 'teacher_guide': tc.teacher_guide, + 'scoring_focus': tc.scoring_focus, + } + except TeachingCase.DoesNotExist: + result['teaching'] = None + + rules = case.scoring_rules.all().order_by('id') + result['scoring_rules'] = [ + { + 'id': r.id, + 'dimension': r.dimension, + 'competency_dimension': r.competency_dimension, + 'score_weight': float(r.score_weight), + 'ai_auto_score': r.ai_auto_score, + 'osce_dimension': r.osce_dimension, + 'scoring_standard': r.scoring_standard, + 'rubric_json': r.rubric_json, + } + for r in rules + ] + + result['exam_items'] = [ + { + 'id': e.id, + 'item_code': e.item_code, + 'item_name': e.item_name, + 'item_type': e.item_type, + 'category': e.category, + 'result_text': e.result_text, + 'result_structured': e.result_structured, + 'is_key': e.is_key, + 'is_abnormal': e.is_abnormal, + 'score_weight': float(e.score_weight), + 'display_order': e.display_order, + } + for e in exam_qs + ] + + return result diff --git a/apps/case/views.py b/apps/case/views.py index a354d50..da57b0a 100644 --- a/apps/case/views.py +++ b/apps/case/views.py @@ -23,28 +23,14 @@ from .serializers import ( ) from .services import case_importer, scoring_rule_generator from .services.department_resolver import resolve_department -from .services.exam_items import ( - normalize_exam_items, assert_no_duplicate_exam_items, EXAM_ITEM_FIELDS, +from .services.case_writer import ( + create_case_full, build_full_response as _build_full_response, + validate_scoring_rules as _validate_scoring_rules, + CASE_BASE_FIELDS, TRADITIONAL_FIELDS, TEACHING_FIELDS, SCORING_RULE_FIELDS, ) audit = logging.getLogger('audit') -TRADITIONAL_FIELDS = {'standard_diagnosis', 'standard_treatment', 'guideline_reference'} -TEACHING_FIELDS = {'teaching_goal', 'discussion_questions', 'teacher_guide', 'scoring_focus'} -SCORING_RULE_FIELDS = { - 'dimension', 'competency_dimension', 'score_weight', - 'ai_auto_score', 'osce_dimension', 'scoring_standard', 'rubric_json', -} - -CASE_BASE_FIELDS = { - 'title', 'case_type', 'difficulty', 'difficulty_score', - 'chief_complaint', 'description', 'patient_age', 'patient_gender', - 'tags', 'symptom_tags', 'disease_tags', 'competency_tags', - 'guideline_tags', 'knowledge_points', 'icd_codes', - 'estimated_minutes', 'osce_enabled', 'rag_enabled', - 'ai_prompt_template', 'multimodal_assets', -} - class CaseBaseViewSet(viewsets.ModelViewSet): """病例管理""" @@ -154,65 +140,12 @@ class CaseBaseViewSet(viewsets.ModelViewSet): ) def full_create(self, request): """C3: 创建病例(主表+子表+评分规则同一事务)""" - data = request.data - case_type = data.get('case_type', '') - - if case_type not in ('traditional', 'teaching'): - raise AppError('CASE_TYPE_NOT_SUPPORTED', f'case_type 不支持: {case_type}', status_code=400) - - if 'stages' in data: - raise AppError('CASE_FIELD_NOT_ALLOWED', '本接口不接收 stages 字段', status_code=400) - - sub_data = data.get(case_type) - other_type = 'teaching' if case_type == 'traditional' else 'traditional' - if not sub_data: - raise AppError('CASE_SUBTYPE_REQUIRED', f'{case_type} 子表数据缺失', status_code=400) - if data.get(other_type): - raise AppError('CASE_SUBTYPE_CONFLICT', '不允许同时传入两种子表数据', status_code=400) - - scoring_rules_data = data.get('scoring_rules', []) - if not scoring_rules_data: - raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必填且至少 1 条', status_code=400) - _validate_scoring_rules(scoring_rules_data) - - exam_items_data = normalize_exam_items(data.get('exam_items') or []) - assert_no_duplicate_exam_items(exam_items_data) - - department = resolve_department(data.get('department_name', '')) - - with transaction.atomic(): - case_kwargs = {k: data[k] for k in CASE_BASE_FIELDS if k in data} - case_kwargs['department'] = department - case_kwargs['created_by'] = request.user - case_kwargs['status'] = 1 - case_kwargs['vector_status'] = 0 - case_kwargs['publish_status'] = 1 if data.get('auto_publish') else 0 - case = CaseBase.objects.create(**case_kwargs) - - sub_model = TraditionalCase if case_type == 'traditional' else TeachingCase - allowed_sub = TRADITIONAL_FIELDS if case_type == 'traditional' else TEACHING_FIELDS - sub_model.objects.create(case=case, **{k: v for k, v in sub_data.items() if k in allowed_sub}) - - rule_objs = [ - ScoringRule(case=case, **{k: v for k, v in rule.items() if k in SCORING_RULE_FIELDS}) - for rule in scoring_rules_data - ] - ScoringRule.objects.bulk_create(rule_objs) - - if exam_items_data: - exam_objs = [ - CaseExamItem( - case=case, - **{k: v for k, v in item.items() if k in EXAM_ITEM_FIELDS}, - ) - for item in exam_items_data - ] - CaseExamItem.objects.bulk_create(exam_objs) + case, n_rules, n_items = create_case_full(request.data, request.user) audit.info( 'CASE_CREATE case_id=%s from=%s by=%s scoring_rules=%d exam_items=%d', - case.id, data.get('parse_id', 'form'), request.user.id, - len(rule_objs), len(exam_items_data), + case.id, request.data.get('parse_id', 'form'), request.user.id, + n_rules, n_items, ) return Response(_build_full_response(case), status=status.HTTP_201_CREATED) @@ -240,7 +173,7 @@ class CaseBaseViewSet(viewsets.ModelViewSet): if not case: raise AppError('NOT_FOUND', '病例不存在', status_code=404) - if case.publish_status != 1: + if case.publish_status != 2: # 仅「已发布」可公开查看;草稿/正常需作者或管理员 if not request.user.is_authenticated: raise AppError('AUTH_UNAUTHORIZED', '请先登录', status_code=401) if not (request.user.id == case.created_by_id @@ -336,15 +269,14 @@ class CaseBaseViewSet(viewsets.ModelViewSet): @action(detail=True, methods=['post']) def publish(self, request, pk=None): case = self.get_object() - case.publish_status = 1 + case.publish_status = 2 # 已发布 case.save() return Response({'message': '病例已发布'}) @action(detail=True, methods=['post']) def unpublish(self, request, pk=None): case = self.get_object() - case.publish_status = 2 - case.save() + case.delete() # 下架 = 软删除(is_deleted=1) return Response({'message': '病例已下架'}) @@ -384,121 +316,3 @@ class ScoringRuleViewSet(viewsets.ModelViewSet): filterset_fields = ['case', 'dimension', 'ai_auto_score', 'osce_dimension'] -# ── helpers ────────────────────────────────────────────────────────────── - -def _validate_scoring_rules(rules): - if not isinstance(rules, list): - raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必须为数组', status_code=400) - for i, rule in enumerate(rules): - if not isinstance(rule, dict): - raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}] 必须为对象', status_code=400) - if not rule.get('dimension'): - raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].dimension 必填', status_code=400) - weight = rule.get('score_weight') - if weight is not None: - try: - weight = float(weight) - except (TypeError, ValueError): - raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须为数字', status_code=400) - if weight <= 0 or weight > 1: - raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须在 (0, 1]', status_code=400) - - -def _build_full_response(case): - if hasattr(case, '_prefetched_objects_cache') and 'exam_items' in getattr( - case, '_prefetched_objects_cache', {} - ): - exam_qs = case.exam_items.all() - else: - exam_qs = CaseExamItem.objects.filter(case_id=case.id).order_by('display_order', 'id') - - result = { - 'case': { - 'id': case.id, - 'title': case.title, - 'case_type': case.case_type, - 'difficulty': case.difficulty, - 'difficulty_score': case.difficulty_score, - 'department': case.department_id, - 'department_name': case.department.name if case.department else None, - 'chief_complaint': case.chief_complaint, - 'description': case.description, - 'patient_age': case.patient_age, - 'patient_gender': case.patient_gender, - 'tags': case.tags, - 'symptom_tags': case.symptom_tags, - 'disease_tags': case.disease_tags, - 'competency_tags': case.competency_tags, - 'guideline_tags': case.guideline_tags, - 'knowledge_points': case.knowledge_points, - 'icd_codes': case.icd_codes, - 'estimated_minutes': case.estimated_minutes, - 'osce_enabled': case.osce_enabled, - 'rag_enabled': case.rag_enabled, - 'ai_prompt_template': case.ai_prompt_template, - 'multimodal_assets': case.multimodal_assets, - 'vector_status': case.vector_status, - 'publish_status': case.publish_status, - 'status': case.status, - 'created_by': case.created_by_id, - 'created_by_name': case.created_by.real_name if case.created_by else None, - 'created_at': case.created_at.isoformat() if case.created_at else None, - 'updated_at': case.updated_at.isoformat() if case.updated_at else None, - }, - } - - if case.case_type == 'traditional': - try: - tc = case.traditionalcase - result['traditional'] = { - 'standard_diagnosis': tc.standard_diagnosis, - 'standard_treatment': tc.standard_treatment, - 'guideline_reference': tc.guideline_reference, - } - except TraditionalCase.DoesNotExist: - result['traditional'] = None - elif case.case_type == 'teaching': - try: - tc = case.teachingcase - result['teaching'] = { - 'teaching_goal': tc.teaching_goal, - 'discussion_questions': tc.discussion_questions, - 'teacher_guide': tc.teacher_guide, - 'scoring_focus': tc.scoring_focus, - } - except TeachingCase.DoesNotExist: - result['teaching'] = None - - rules = case.scoring_rules.all().order_by('id') - result['scoring_rules'] = [ - { - 'id': r.id, - 'dimension': r.dimension, - 'competency_dimension': r.competency_dimension, - 'score_weight': float(r.score_weight), - 'ai_auto_score': r.ai_auto_score, - 'osce_dimension': r.osce_dimension, - 'scoring_standard': r.scoring_standard, - 'rubric_json': r.rubric_json, - } - for r in rules - ] - - result['exam_items'] = [ - { - 'id': e.id, - 'item_code': e.item_code, - 'item_name': e.item_name, - 'item_type': e.item_type, - 'category': e.category, - 'result_text': e.result_text, - 'result_structured': e.result_structured, - 'is_key': e.is_key, - 'is_abnormal': e.is_abnormal, - 'score_weight': float(e.score_weight), - 'display_order': e.display_order, - } - for e in exam_qs - ] - - return result diff --git a/apps/cms/permissions.py b/apps/cms/permissions.py index b79f3d8..58b57d3 100644 --- a/apps/cms/permissions.py +++ b/apps/cms/permissions.py @@ -37,6 +37,32 @@ class IsSuperOrHospitalAdmin(BasePermission): raise AppError('CMS_PERMISSION_DENIED', '需要超级管理员或医院管理员权限', status_code=403) +def is_content_admin(user): + return getattr(user, 'role_type', '') == 'content_admin' + + +def is_hospital_admin(user): + return getattr(user, 'role_type', '') == 'hospital_admin' + + +class IsSuperContentOrHospitalAdmin(BasePermission): + """超级管理员 / 内容管理员 / 医院管理员可访问(病例库 + AI 生成 + 病例审核)。 + + 数据范围(超管全平台;内容/医院管理员仅本机构 `institution`)由 ViewSet 的 + get_queryset 收口;各动作的角色权限(建/改归内容管理员,审核发布归医院管理员) + 在 ViewSet 内逐动作判定。 + """ + + def has_permission(self, request, view): + user = request.user + if user and user.is_authenticated and ( + is_super(user) or is_content_admin(user) or is_hospital_admin(user) + ): + return True + raise AppError('CMS_PERMISSION_DENIED', + '需要超级管理员 / 内容管理员 / 医院管理员权限', status_code=403) + + class IsTeacher(BasePermission): """仅带教医生(role_type=doctor)可访问。 diff --git a/apps/cms/urls.py b/apps/cms/urls.py index 86e35bb..48f79ba 100644 --- a/apps/cms/urls.py +++ b/apps/cms/urls.py @@ -5,4 +5,5 @@ from django.urls import path, include urlpatterns = [ path('', include('apps.organization.urls')), # 机构、科室 path('', include('apps.user.cms_urls')), # 用户 + path('', include('apps.case.cms_urls')), # 病例库 + AI 病例生成 ] diff --git a/apps/user/stats.py b/apps/user/stats.py index ae66524..83c2fca 100644 --- a/apps/user/stats.py +++ b/apps/user/stats.py @@ -21,32 +21,30 @@ from drf_spectacular.utils import extend_schema from apps.training.models import TrainingRecord -# 临床核心能力标准 6 维(A 组) -STANDARD_DIMS = ['病史采集', '查体能力', '检查决策', '诊断能力', '治疗决策', '医患沟通'] +# 临床胜任力标准 5 维(与 training_score_detail / ai_feedback_structured 的评分维度一致) +STANDARD_DIMS = ['信息获取', '分析推理', '处置决策', '沟通人文', '临床整合'] -# 评分维度(fastapi 实际打分维度,名称随病例评分规则而变)→ 标准 6 维 的归并映射。 -# ⚠️ 数据中维度名不统一(已观察到两套来源、且随病例不同),此映射为按实测维度的最佳归并, -# 最终口径需与评分/内容团队对齐。未在映射内的维度会被忽略。 +# 评分维度(fastapi 实际打分维度,名称随病例评分规则而变)→ 标准 5 维 的归并映射。 +# 维度名不完全统一(实测有 检查利用/临床推理/人文沟通 等同义写法),统一归并到 5 个标准维度。 +# 未在映射内的维度会被忽略。 DIMENSION_MAP = { - # 病史采集 - '信息获取': '病史采集', '病史采集': '病史采集', '问诊': '病史采集', - # 查体能力(注:当前 ai_feedback 数据未单列查体维度,多由内容侧 rubric 决定) - '查体能力': '查体能力', '体格检查': '查体能力', '查体': '查体能力', - # 检查决策 - '检查决策': '检查决策', '检查利用': '检查决策', '检查理解': '检查决策', - '辅助检查': '检查决策', '检验决策': '检查决策', - # 诊断能力 - '诊断推理': '诊断能力', '诊断能力': '诊断能力', '鉴别诊断': '诊断能力', - '分析推理': '诊断能力', '临床推理': '诊断能力', '临床整合': '诊断能力', - '知识掌握': '诊断能力', '知识运用': '诊断能力', - # 治疗决策 - '治疗决策': '治疗决策', '处置决策': '治疗决策', '治疗': '治疗决策', - # 医患沟通 - '沟通技巧': '医患沟通', '医患沟通': '医患沟通', - '沟通人文': '医患沟通', '人文沟通': '医患沟通', '沟通': '医患沟通', + # 信息获取(病史/问诊) + '信息获取': '信息获取', '病史采集': '信息获取', '问诊': '信息获取', + # 分析推理(诊断/鉴别/推理) + '分析推理': '分析推理', '临床推理': '分析推理', '诊断推理': '分析推理', + '鉴别诊断': '分析推理', '诊断能力': '分析推理', + # 处置决策(检查决策 + 治疗决策合并) + '处置决策': '处置决策', '检查利用': '处置决策', '检查理解': '处置决策', + '检查决策': '处置决策', '辅助检查': '处置决策', '检验决策': '处置决策', + '治疗决策': '处置决策', '治疗': '处置决策', + # 沟通人文 + '沟通人文': '沟通人文', '人文沟通': '沟通人文', '沟通技巧': '沟通人文', + '医患沟通': '沟通人文', '沟通': '沟通人文', + # 临床整合(含知识掌握/运用) + '临床整合': '临床整合', '知识掌握': '临床整合', '知识运用': '临床整合', } -# 诊断相关维度(诊断准确率口径:这些维度的平均得分率) -DIAGNOSIS_DIMS = {'诊断推理', '诊断能力', '鉴别诊断', '分析推理', '临床推理', '临床整合'} +# 诊断相关维度(诊断准确率口径:这些维度的平均得分率;与归并到「分析推理」的来源一致) +DIAGNOSIS_DIMS = {'诊断推理', '诊断能力', '鉴别诊断', '分析推理', '临床推理'} def _completed_qs(user): diff --git a/test/swagger_cms_case.py b/test/swagger_cms_case.py new file mode 100644 index 0000000..e0667bc --- /dev/null +++ b/test/swagger_cms_case.py @@ -0,0 +1,515 @@ +# -*- coding: utf-8 -*- +""" +CMS 病例库 + AI 病例生成 + 病例审核 — Swagger Try-it-out 等效脚本(真实 HTTP)。 + +覆盖 CSV 行 80~98(超级管理员/内容管理员/医院管理员 各角色拆分): + CMS-CASE-1~7、CMS-CASE-AI-1、CMS-AUDIT-1~3 + +- 详细日志:logs/test-swagger-cms-case-YYYY-MM-DD.log +- 样例 JSON:logs/cms-case-swagger-examples.json(供回填 docx/API - Sheet1.csv) +- PDF/AI 需本地 Django + Redis + .env 中 DEEPSEEK_API_KEY;无 Key 时加 --skip-ai 跳过 + +运行: + .venv\\Scripts\\python.exe test/swagger_cms_case.py + .venv\\Scripts\\python.exe test/swagger_cms_case.py --update-csv + .venv\\Scripts\\python.exe test/swagger_cms_case.py --skip-ai --update-csv +""" +from __future__ import annotations + +import argparse +import csv +import io +import json +import os +import subprocess +import sys +from datetime import datetime +from typing import Any + +import requests + +sys.stdout.reconfigure(encoding='utf-8') +sys.stderr.reconfigure(encoding='utf-8') + +BASE = os.environ.get('SWAGGER_BASE', 'http://127.0.0.1:8000') +PYTHON = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.venv', 'Scripts', 'python.exe') +if not os.path.isfile(PYTHON): + PYTHON = sys.executable +CWD = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +LOG_DIR = os.path.join(CWD, 'logs') +CSV_PATH = os.path.join(CWD, 'docx', 'API - Sheet1.csv') +os.makedirs(LOG_DIR, exist_ok=True) + +LOG_FILE = os.path.join(LOG_DIR, f'test-swagger-cms-case-{datetime.now():%Y-%m-%d}.log') +EXAMPLES_FILE = os.path.join(LOG_DIR, 'cms-case-swagger-examples.json') + +SUPER_PHONE = '13700009001' +CONTENT_PHONE = '13700009002' +HOSPITAL_PHONE = '13700009004' + +_fh = open(LOG_FILE, 'a', encoding='utf-8') +examples: dict[str, dict] = {} +results: list[tuple[str, str, int]] = [] + + +def w(text=''): + line = str(text) + _fh.write(line + '\n') + _fh.flush() + print(line) + + +def django_eval(code: str) -> str: + pre = ( + 'import django,os;' + 'os.environ.setdefault("DJANGO_SETTINGS_MODULE","config.settings");' + 'django.setup();' + ) + p = subprocess.run( + [PYTHON, '-c', pre + code], + capture_output=True, text=True, cwd=CWD, encoding='utf-8', + ) + if p.returncode != 0: + w(f'[django_eval stderr] {p.stderr}') + return (p.stdout or '').strip() + + +def compact_json(obj: Any, max_len: int = 2800) -> str: + s = json.dumps(obj, ensure_ascii=False, separators=(',', ':')) + if len(s) > max_len: + return s[:max_len] + '...(truncated)' + return s + + +def case_payload(title: str, institution_id=None, with_exam=False): + p = { + 'title': title, + 'case_type': 'traditional', + 'difficulty': 'medium', + 'chief_complaint': '发热 3 天', + 'description': '患儿,男,4 岁,因发热 3 天就诊。', + 'patient_age': 4, + 'patient_gender': 'male', + 'department_name': '儿科', + 'estimated_minutes': 30, + 'osce_enabled': False, + 'tags': '儿科,发热', + 'traditional': { + 'standard_diagnosis': '上呼吸道感染', + 'standard_treatment': '对症治疗,退热处理', + 'guideline_reference': '《儿科学》第 9 版', + }, + 'scoring_rules': [ + {'dimension': '诊断准确性', 'score_weight': 0.5, 'ai_auto_score': True, 'scoring_standard': '能准确诊断'}, + {'dimension': '医患沟通', 'score_weight': 0.5, 'ai_auto_score': False, 'scoring_standard': '沟通到位'}, + ], + } + if institution_id is not None: + p['institution_id'] = institution_id + if with_exam: + p['exam_items'] = [{ + 'item_code': 'blood_routine', + 'item_name': '血常规', + 'item_type': 'lab', + 'category': '实验室检查', + 'result_text': 'WBC 10×10^9/L', + 'is_key': True, + 'is_abnormal': False, + 'score_weight': 1.0, + 'display_order': 1, + }] + return p + + +def shrink_list_resp(resp: dict) -> dict: + """列表响应保留 1 条 results 样例,便于 CSV。""" + if not isinstance(resp, dict): + return resp + out = dict(resp) + if isinstance(out.get('results'), list) and out['results']: + out['results'] = [out['results'][0]] + return out + + +def shrink_full_resp(resp: dict) -> dict: + """完整病例响应保留关键字段。""" + if not isinstance(resp, dict): + return resp + c = resp.get('case') or {} + return { + 'case': {k: c.get(k) for k in ( + 'id', 'title', 'case_type', 'institution', 'institution_name', + 'department', 'department_name', 'publish_status', 'created_by_name', 'created_at', + )}, + 'traditional': resp.get('traditional'), + 'scoring_rules': (resp.get('scoring_rules') or [])[:2], + 'exam_items': (resp.get('exam_items') or [])[:1], + } + + +def shrink_ai_resp(resp: dict) -> dict: + if not isinstance(resp, dict): + return resp + data = resp.get('data') or {} + return { + 'parse_id': resp.get('parse_id'), + 'case_type': resp.get('case_type'), + 'ai_usage': resp.get('ai_usage'), + 'prompt_version': resp.get('prompt_version'), + 'parsing_seconds': resp.get('parsing_seconds'), + 'generating_seconds': resp.get('generating_seconds'), + 'data': {k: data.get(k) for k in ( + 'title', 'case_type', 'chief_complaint', 'department_name', 'patient_age', 'patient_gender', + ) if k in data}, + } + + +def call( + csv_key: str, + name: str, + method: str, + path: str, + token: str | None = None, + json_body=None, + params=None, + files=None, + data=None, + expect=200, + shrink=None, +): + """发起 HTTP 请求,记录日志与 CSV 样例。""" + headers = {'Authorization': f'Bearer {token}'} if token else {} + url = BASE + path + r = requests.request( + method, url, headers=headers, json=json_body, params=params, + files=files, data=data, timeout=120, + ) + ct = r.headers.get('content-type', '') + is_json = ct.startswith('application/json') + resp_raw = r.json() if is_json else r.text + resp_store = resp_raw + if shrink == 'list': + resp_store = shrink_list_resp(resp_raw if isinstance(resp_raw, dict) else {}) + elif shrink == 'full': + resp_store = shrink_full_resp(resp_raw if isinstance(resp_raw, dict) else {}) + elif shrink == 'ai': + resp_store = shrink_ai_resp(resp_raw if isinstance(resp_raw, dict) else {}) + elif shrink == 'create': + resp_store = shrink_full_resp(resp_raw if isinstance(resp_raw, dict) else {}) + + if json_body is not None: + req_example = json.dumps(json_body, ensure_ascii=False) + elif files is not None: + fn = list(files.values())[0][0] if files else 'file.pdf' + extra = f', case_type={data.get("case_type")}' if data else '' + req_example = f'multipart/form-data: files={fn}{extra}' + elif params: + req_example = '?' + '&'.join(f'{k}={v}' for k, v in params.items()) + elif method == 'GET' and '{' in path: + req_example = f'路径参数 + 请求头 Authorization: Bearer ' + elif method in ('POST',) and json_body is None and files is None: + req_example = '路径参数 id + 请求头 Authorization(无 Body)' + else: + req_example = '请求头 Authorization: Bearer ' + + exp_list = expect if isinstance(expect, (list, tuple)) else [expect] + ok = 'PASS' if r.status_code in exp_list else 'FAIL' + results.append((csv_key, ok, r.status_code)) + examples[csv_key] = { + 'name': name, + 'method': method, + 'path': path.lstrip('/'), + 'status': r.status_code, + 'req': req_example, + 'resp': resp_store, + } + + w(f'\n[{ok}] {csv_key} {name} -> {method} {path} (status={r.status_code})') + w(f' 请求: {req_example}') + body_str = json.dumps(resp_store, ensure_ascii=False, indent=2) if is_json else str(resp_store) + if len(body_str) > 1800: + body_str = body_str[:1800] + f' ...(截断, 共{len(body_str)}字符)' + w(f' 响应: {body_str}') + return r + + +def cleanup(): + django_eval( + 'from apps.case.models import CaseBase; from apps.user.models import User; ' + f'CaseBase.all_objects.filter(created_by__phone__in=["{SUPER_PHONE}","{CONTENT_PHONE}"]).hard_delete(); ' + f'User.objects.filter(phone__in=["{SUPER_PHONE}","{CONTENT_PHONE}","{HOSPITAL_PHONE}"]).delete(); ' + 'from apps.user.models import Institution; ' + 'Institution.all_objects.filter(code__in=["SWG_CASE_A","SWG_CASE_B"]).hard_delete(); ' + 'print("cleaned")' + ) + + +def setup_tokens(): + w('\n[准备] 建临时机构/用户并签发 token ...') + setup = django_eval( + 'from apps.user.models import User, Institution, Department; ' + 'from rest_framework_simplejwt.tokens import RefreshToken; ' + 'iA,_=Institution.objects.get_or_create(code="SWG_CASE_A", defaults={"name":"病例联调A院","type":"hospital"}); ' + 'iB,_=Institution.objects.get_or_create(code="SWG_CASE_B", defaults={"name":"病例联调B院","type":"hospital"}); ' + 'd1,_=Department.objects.get_or_create(name="儿科", defaults={"category":"临床"}); ' + 'd2,_=Department.objects.get_or_create(name="内科", defaults={"category":"临床"}); ' + f'[User.objects.filter(phone=p).delete() for p in ["{SUPER_PHONE}","{CONTENT_PHONE}","{HOSPITAL_PHONE}"]]; ' + f'su=User.objects.create_user(username="{SUPER_PHONE}",password=None,phone="{SUPER_PHONE}",' + f'real_name="病例超管",role_type="super_admin",institution=iA,status=1); ' + f'cu=User.objects.create_user(username="{CONTENT_PHONE}",password=None,phone="{CONTENT_PHONE}",' + f'real_name="病例内容员",role_type="content_admin",institution=iA,status=1); ' + f'hu=User.objects.create_user(username="{HOSPITAL_PHONE}",password=None,phone="{HOSPITAL_PHONE}",' + f'real_name="病例院管",role_type="hospital_admin",institution=iA,status=1); ' + 'print("|".join([str(RefreshToken.for_user(su).access_token),' + 'str(RefreshToken.for_user(cu).access_token),str(RefreshToken.for_user(hu).access_token),' + 'str(d1.id),str(d2.id),str(iA.id),str(iB.id)]))' + ) + su_tok, cu_tok, hu_tok, d1_id, d2_id, iA_id, iB_id = setup.split('|') + w(f'[准备] 完成 instA={iA_id} instB={iB_id} dept儿科={d1_id} dept内科={d2_id}') + return su_tok, cu_tok, hu_tok, int(d1_id), int(d2_id), int(iA_id), int(iB_id) + + +def run_tests(skip_ai: bool): + su_tok, cu_tok, hu_tok, d1_id, d2_id, iA_id, iB_id = setup_tokens() + + w('\n' + '=' * 90) + w(' 超级管理员 - 病例库') + w('=' * 90) + + # 先建病例供后续列表/查看 + body_super = case_payload('Swagger超管病例(联调)', with_exam=True) + r = call('82', 'CMS-CASE-3 表单新增(超管)', 'POST', '/api/cms/cases/', + su_tok, json_body=body_super, expect=201, shrink='create') + super_case_id = (r.json().get('case') or {}).get('id') + r_b = call('_b', 'CMS-CASE-3 指定B院(流程用)', 'POST', '/api/cms/cases/', + su_tok, json_body=case_payload('Swagger B院病例', institution_id=iB_id), + expect=201, shrink='create') + b_case_id = (r_b.json().get('case') or {}).get('id') + + call('80', 'CMS-CASE-1 病例列表(超管)', 'GET', '/api/cms/cases/', + su_tok, params={'page': 1}, expect=200, shrink='list') + + pdf_bytes = ( + b'%PDF-1.4\n1 0 obj<>endobj\n' + b'2 0 obj<>endobj\n' + b'3 0 obj<>endobj\n' + b'4 0 obj<>stream\nBT /F1 12 Tf 100 700 Td (fever 3d) Tj ET\nendstream\nendobj\n' + b'xref\n0 5\ntrailer<>\nstartxref\n0\n%%EOF' + ) + if not skip_ai: + w('\n[AI] PDF 导入 / AI 生成(需 DEEPSEEK_API_KEY)...') + call('81', 'CMS-CASE-2 PDF导入(超管)', 'POST', '/api/cms/cases/import-pdf/', + su_tok, files={'files': ('case.pdf', pdf_bytes, 'application/pdf')}, + data={'case_type': 'traditional'}, expect=[200, 429, 502, 504], shrink='ai') + call('87', 'CMS-CASE-AI-1 AI生成(超管)', 'POST', '/api/cms/cases/ai-generate/', + su_tok, json_body={ + 'case_type': 'traditional', + 'prompt': '请生成一个儿科急性上呼吸道感染传统病例,患儿4岁男孩,发热咳嗽3天。', + }, expect=[200, 429, 502, 504], shrink='ai') + else: + w('\n[跳过] PDF/AI 接口(--skip-ai)') + + call('83', 'CMS-CASE-4 编辑关联(超管)', 'POST', f'/api/cms/cases/{super_case_id}/relations/', + su_tok, json_body={'institution_id': iB_id, 'department_id': d2_id}, expect=200) + + # 单独草稿用于 submit + r = call('84', 'CMS-CASE-5 提交(超管)', 'POST', f'/api/cms/cases/{super_case_id}/submit/', + su_tok, expect=200) + # 再建一条用于 disable + r2 = call('85', 'CMS-CASE-6 停用(超管)', 'POST', f'/api/cms/cases/{b_case_id}/disable/', + su_tok, expect=200) + + call('86', 'CMS-CASE-7 查看(超管)', 'GET', f'/api/cms/cases/{super_case_id}/full/', + su_tok, expect=200, shrink='full') + + w('\n' + '=' * 90) + w(' 内容管理员 - 病例库') + w('=' * 90) + + body_content = case_payload('Swagger内容员病例(联调)') + r = call('90', 'CMS-CASE-3 表单新增(内容员)', 'POST', '/api/cms/cases/', + cu_tok, json_body=body_content, expect=201, shrink='create') + content_case_id = (r.json().get('case') or {}).get('id') + + call('88', 'CMS-CASE-1 病例列表(内容员)', 'GET', '/api/cms/cases/', + cu_tok, params={'page': 1}, expect=200, shrink='list') + + if not skip_ai: + call('89', 'CMS-CASE-2 PDF导入(内容员)', 'POST', '/api/cms/cases/import-pdf/', + cu_tok, files={'files': ('case.pdf', pdf_bytes, 'application/pdf')}, + data={'case_type': 'traditional'}, expect=[200, 429, 502, 504], shrink='ai') + call('95', 'CMS-CASE-AI-1 AI生成(内容员)', 'POST', '/api/cms/cases/ai-generate/', + cu_tok, json_body={ + 'case_type': 'traditional', + 'prompt': '请生成一个儿科发热传统病例,4岁男孩,发热3天。', + }, expect=[200, 429, 502, 504], shrink='ai') + else: + w('\n[跳过] 内容员 PDF/AI') + + call('91', 'CMS-CASE-4 编辑关联(内容员)', 'POST', f'/api/cms/cases/{content_case_id}/relations/', + cu_tok, json_body={'department_id': d2_id}, expect=200) + + call('92', 'CMS-CASE-5 提交(内容员)', 'POST', f'/api/cms/cases/{content_case_id}/submit/', + cu_tok, expect=200) + + r_dis = call('_dis', '待停用(流程用)', 'POST', '/api/cms/cases/', + cu_tok, json_body=case_payload('Swagger待停用病例'), expect=201, shrink='create') + disable_id = (r_dis.json().get('case') or {}).get('id') + call('93', 'CMS-CASE-6 停用(内容员)', 'POST', f'/api/cms/cases/{disable_id}/disable/', + cu_tok, expect=200) + + call('94', 'CMS-CASE-7 查看(内容员)', 'GET', f'/api/cms/cases/{content_case_id}/full/', + cu_tok, expect=200, shrink='full') + + w('\n' + '=' * 90) + w(' 医院管理员 - 病例审核') + w('=' * 90) + + call('96', 'CMS-AUDIT-1 待审核列表', 'GET', '/api/cms/cases/', + hu_tok, params={'publish_status': 1, 'page': 1}, expect=200, shrink='list') + + call('97', 'CMS-AUDIT-2 查看病例', 'GET', f'/api/cms/cases/{content_case_id}/full/', + hu_tok, expect=200, shrink='full') + + call('98', 'CMS-AUDIT-3 发布', 'POST', f'/api/cms/cases/{content_case_id}/publish/', + hu_tok, expect=200) + + +# 各行「说明」原文(CSV 第 9 列,update 时保持不变) +ROW_NOTES = { + '80': '仅超级管理员。返回全平台病例(可用?institution=按机构过滤)。已软删(下架)不返回。不可发布(归医院管理员 CMS-AUDIT-3)。未登录401;非超管403', + '81': '仅超级管理员。解析预览不落库,前端审核后调 CMS-CASE-3 入库时可传 institution_id 指定任意机构。case_type非法→400;非multipart→415;限流429;AI异常502/504;非超管403', + '82': '仅超级管理员。落库为草稿(publish_status=0),可指定任意 institution_id。错误:CASE_TYPE_NOT_SUPPORTED/CASE_SUBTYPE_REQUIRED/CASE_SUBTYPE_CONFLICT/CASE_VALIDATION_ERROR/CASE_FIELD_NOT_ALLOWED/CASE_INSTITUTION_NOT_FOUND/CASE_DEPARTMENT_NOT_FOUND/CASE_EXAM_ITEM_DUPLICATE;非超管403', + '83': '仅超级管理员。可改全平台任意病例的所属机构/科室,不改病例内容。三者都没传→400 CASE_VALIDATION_ERROR;机构/科室不存在→400;病例不存在/已软删→404;非超管403', + '84': '仅超级管理员。全平台任意病例:草稿(0)→正常(1),进入对应机构医院管理员审核队列。非草稿→400 CASE_NOT_SUBMITTABLE;病例不存在/已软删→404;非超管403', + '85': '仅超级管理员。全平台任意病例停用=下架=软删除(is_deleted=1,不物删)。下架后默认列表/查看不返回;病例不存在/已软删→404;非超管403', + '86': '仅超级管理员。可查看全平台任意病例完整结构。病例不存在/已软删→404;非超管403', + '87': '仅超级管理员。DeepSeek按病例模板(prompts/case_{type}_full.md)生成,不落库;前端审核后走 CMS-CASE-3 入库时可指定 institution_id。prompt空→400;限流429;AI异常500/502/504;非超管403', + '88': '仅内容管理员。仅返回本院(institution=登录用户所属机构)病例;他院不可见。?institution 参数无效(后端强制本院)。已软删不返回。不可审核发布(归医院管理员)。未登录401;非内容管理员403', + '89': '仅内容管理员。解析预览不落库,前端审核后调 CMS-CASE-3 入库时 institution 强制本院(忽略 institution_id)。case_type非法→400;非multipart→415;限流429;AI异常502/504;非内容管理员403', + '90': '仅内容管理员。新建强制 institution=本院(传 institution_id 亦忽略)。落库为草稿(publish_status=0)。错误同 CMS-CASE-3;非内容管理员403', + '91': '仅内容管理员。仅能操作本院病例;可改科室,机构锁定本院(传 institution_id 无效或不可改他院)。不改病例内容。他院病例→404;非内容管理员403', + '92': '仅内容管理员。仅本院病例:草稿(0)→正常(1),进入本院医院管理员审核队列。非草稿→400 CASE_NOT_SUBMITTABLE;他院/不存在/已软删→404;非内容管理员403', + '93': '仅内容管理员。仅本院病例停用=下架=软删除(is_deleted=1)。下架后默认列表/查看不返回;他院/不存在/已软删→404;非内容管理员403', + '94': '仅内容管理员。仅可查看本院病例完整结构。他院/不存在/已软删→404;非内容管理员403', + '95': '仅内容管理员。DeepSeek按病例模板生成,不落库;前端审核后走 CMS-CASE-3 入库时强制落本院。prompt空→400;限流429;AI异常500/502/504;非内容管理员403', + '96': '仅医院管理员。本院 publish_status=1(正常/待审核)病例列表,即 CMS-CASE-1 加 publish_status=1 且后端强制本院。超管/内容管理员请用各自病例列表接口。未登录401;非医院管理员403', + '97': '仅医院管理员。审核前查看本院待审核病例完整内容。他院/不存在/已软删→404;非医院管理员403', + '98': '仅医院管理员(超管/内容管理员均403)。本院病例:正常(1)→已发布(2),发布后对本院移动端医学生可见。非正常→400 CASE_NOT_PUBLISHABLE;他院/不存在/已软删→404', +} + +# 各行 params 字段说明前缀(不含「实际示例」) +ROW_PARAMS_BASE = { + '80': '查询参数(均可选):search(标题/主诉/标签/ICD)、case_type、publish_status(0草稿/1正常/2已发布)、institution(机构ID,可按任意机构过滤)、department(科室ID)、status、osce_enabled、ordering、page;请求头:Authorization', + '81': '请求体(multipart/form-data):files=1~5份.pdf、case_type=traditional|teaching;请求头:Authorization', + '82': '请求体(JSON):title*、case_type*(traditional|teaching)、institution_id(可选,指定落库机构;缺省落创建者机构)、department_name、子表traditional|teaching*、scoring_rules*(≥1,dimension必填、score_weight∈(0,1])、exam_items(可选,item_code不重复)、difficulty/chief_complaint/description/patient_age/patient_gender/tags/icd_codes/estimated_minutes/osce_enabled…;不接收stages;请求头:Authorization', + '83': '路径参数:id;请求体(JSON,至少一项):institution_id(机构ID,null清空,可改任意机构)、department_id(科室ID,null清空) 或 department_name;请求头:Authorization', + '84': '路径参数:id(无Body);请求头:Authorization', + '85': '路径参数:id(无Body);请求头:Authorization', + '86': '路径参数:id;请求头:Authorization', + '87': '请求体(JSON):prompt*(病例长描述)、case_type*(traditional|teaching);请求头:Authorization', + '88': '查询参数(均可选):search、case_type、publish_status(0草稿/1正常/2已发布)、department(科室ID)、status、osce_enabled、ordering、page;请求头:Authorization', + '89': '请求体(multipart/form-data):files=1~5份.pdf、case_type=traditional|teaching;请求头:Authorization', + '90': '请求体(JSON):title*、case_type*(traditional|teaching)、department_name、子表traditional|teaching*、scoring_rules*(≥1,dimension必填、score_weight∈(0,1])、exam_items(可选,item_code不重复)、difficulty/chief_complaint/description/patient_age/patient_gender/tags/icd_codes/estimated_minutes/osce_enabled…;不接收institution_id/stages;请求头:Authorization', + '91': '路径参数:id;请求体(JSON,至少一项):department_id(科室ID,null清空) 或 department_name;请求头:Authorization', + '92': '路径参数:id(无Body);请求头:Authorization', + '93': '路径参数:id(无Body);请求头:Authorization', + '94': '路径参数:id;请求头:Authorization', + '95': '请求体(JSON):prompt*(病例长描述)、case_type*(traditional|teaching);请求头:Authorization', + '96': '查询参数:publish_status=1(正常=待审核)、search、department、page;请求头:Authorization', + '97': '路径参数:id;请求头:Authorization', + '98': '路径参数:id(无Body);请求头:Authorization', +} + + +def update_csv(): + """用 examples 回填 CSV 第 80~98 行的 params / response 列。""" + if not os.path.isfile(EXAMPLES_FILE): + w(f'样例文件不存在: {EXAMPLES_FILE}') + return False + with open(EXAMPLES_FILE, encoding='utf-8') as f: + ex = json.load(f) + + rows = [] + with open(CSV_PATH, encoding='utf-8', newline='') as f: + rows = list(csv.reader(f)) + + updated = 0 + for i, row in enumerate(rows): + if not row or row[0] not in ex: + continue + row_num = row[0] + e = ex[row_num] + method = e.get('method', row[6] if len(row) > 6 else 'GET') + base_params = ROW_PARAMS_BASE.get(row_num, row[7] if len(row) > 7 else '') + req = e.get('req', '') + + if method == 'GET' and req.startswith('?'): + new_params = f'{base_params} | 实际示例:{method} /{e["path"]}{req}' + elif req.startswith('multipart'): + new_params = f'{base_params} | 实际示例:{req}' + elif req.startswith('{') or req.startswith('['): + new_params = f'{base_params} | 实际示例:{req}' + elif '路径参数' in req or '无 Body' in req or 'Bearer' in req: + new_params = f'{base_params} | 实际示例:{method} /{e["path"]}' + else: + new_params = f'{base_params} | 实际示例:{req}' + + new_resp = f'实际返回:{compact_json(e.get("resp", ""))}' + note = ROW_NOTES.get(row_num, row[9] if len(row) > 9 else '') + + rows[i] = row[:7] + [new_params, new_resp, note, '1', row[11] if len(row) > 11 else '0'] + updated += 1 + w(f' CSV 行 #{row_num} 已更新 ({e.get("name")})') + + with open(CSV_PATH, 'w', encoding='utf-8', newline='') as f: + csv.writer(f, quoting=csv.QUOTE_MINIMAL).writerows(rows) + w(f'\nCSV 更新完成:{updated} 行 -> {CSV_PATH}') + return updated > 0 + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--skip-ai', action='store_true', help='跳过 PDF/AI(无 DeepSeek Key 时)') + parser.add_argument('--update-csv', action='store_true', help='测试后回填 docx/API - Sheet1.csv') + parser.add_argument('--csv-only', action='store_true', help='仅从已有 examples JSON 更新 CSV') + args = parser.parse_args() + + w('=' * 90) + w(f' CMS 病例 Swagger 测试 {datetime.now():%Y-%m-%d %H:%M:%S}') + w(f' BASE={BASE}') + w('=' * 90) + + if args.csv_only: + update_csv() + _fh.close() + return 0 + + try: + run_tests(skip_ai=args.skip_ai) + finally: + w('\n[清理] 删除测试病例与用户 ...') + cleanup() + + with open(EXAMPLES_FILE, 'w', encoding='utf-8') as f: + json.dump(examples, f, ensure_ascii=False, indent=2) + + w('\n' + '=' * 90) + total = len(results) + passed = sum(1 for _, ok, _ in results if ok == 'PASS') + w(f' 总计 {total} | 通过 {passed} | 失败 {total - passed}') + fails = [(c, s) for c, ok, s in results if ok == 'FAIL'] + if fails: + w(' 失败: ' + ', '.join(f'{c}({s})' for c, s in fails)) + w(f' 日志: {LOG_FILE}') + w(f' 样例: {EXAMPLES_FILE}') + + if args.update_csv: + update_csv() + + _fh.close() + return 0 if not fails else 1 + + +if __name__ == '__main__': + raise SystemExit(main()) diff --git a/test/test_cms_case.py b/test/test_cms_case.py new file mode 100644 index 0000000..39191fc --- /dev/null +++ b/test/test_cms_case.py @@ -0,0 +1,237 @@ +"""CMS 病例库 + AI 病例生成 + 病例审核测试(超管 / 内容管理员 / 医院管理员)。 + +覆盖 CMS-CASE-1~7 + CMS-CASE-AI-1 + CMS-AUDIT-3(发布)、权限与角色分工、机构(institution) +范围收口、软删除(is_deleted)、状态机(草稿0→正常1→已发布2)、仅 GET/POST。AI 调用全程 mock。 +""" +from unittest.mock import patch + +from rest_framework.test import APIClient + +from apps.case.models import CaseBase +from apps.user.throttling import PdfParseUserThrottle +from .conftest import ( + CacheTestCase, create_test_user, get_auth_client, ensure_department, ensure_institution, + build_traditional_payload, make_fake_pdf, MOCK_C1_PARSE_RESULT, +) + +CASE_URL = '/api/cms/cases/' +IMPORT_PDF_URL = '/api/cms/cases/import-pdf/' +AI_GENERATE_URL = '/api/cms/cases/ai-generate/' + + +def full_url(pk): + return f'/api/cms/cases/{pk}/full/' + + +def relations_url(pk): + return f'/api/cms/cases/{pk}/relations/' + + +def submit_url(pk): + return f'/api/cms/cases/{pk}/submit/' + + +def disable_url(pk): + return f'/api/cms/cases/{pk}/disable/' + + +def publish_url(pk): + return f'/api/cms/cases/{pk}/publish/' + + +class CmsCaseTest(CacheTestCase): + + def setUp(self): + super().setUp() + self.instA = ensure_institution(name='测试医院A', code='CASE-HA') + self.instB = ensure_institution(name='测试医院B', code='CASE-HB') + self.dept = ensure_department('儿科') + self.dept2 = ensure_department('内科') + self.superu = create_test_user(phone='13966600001', role_type='super_admin', institution=self.instA) + self.content = create_test_user(phone='13966600002', role_type='content_admin', institution=self.instA) + self.contentB = create_test_user(phone='13966600004', role_type='content_admin', institution=self.instB) + self.hospital = create_test_user(phone='13966600005', role_type='hospital_admin', institution=self.instA) + self.student = create_test_user(phone='13966600003', role_type='student', institution=self.instA) + self.sclient = get_auth_client(self.superu) + self.cclient = get_auth_client(self.content) + self.hclient = get_auth_client(self.hospital) + + def _create_case(self, client, title='CMS测试病例', institution_id=None): + payload = build_traditional_payload(department_name='儿科', scoring_rules_count=2, with_exam_items=True) + payload['title'] = title + if institution_id is not None: + payload['institution_id'] = institution_id + resp = client.post(CASE_URL, payload, format='json') + self.assertEqual(resp.status_code, 201, resp.content) + return resp.json() + + # ── 权限 ───────────────────────────────────────────────────────────── + def test_requires_auth(self): + self.assertEqual(APIClient().get(CASE_URL).status_code, 401) + + def test_student_forbidden(self): + resp = get_auth_client(self.student).get(CASE_URL) + self.assertEqual(resp.status_code, 403, resp.content) + self.assertEqual(resp.json()['code'], 'CMS_PERMISSION_DENIED') + + # ── CMS-CASE-3 表单新增 ────────────────────────────────────────────── + def test_create_form(self): + created = self._create_case(self.sclient) + self.assertEqual(created['case']['case_type'], 'traditional') + self.assertEqual(created['case']['publish_status'], 0) # 草稿 + self.assertEqual(created['case']['created_by'], self.superu.id) + self.assertEqual(created['case']['institution'], self.instA.id) # 超管缺省落本院 + self.assertEqual(len(created['scoring_rules']), 2) + self.assertEqual(len(created['exam_items']), 1) + + def test_content_admin_create_forces_own_institution(self): + # 内容管理员即便传 institution_id 也强制落本院 + created = self._create_case(self.cclient, institution_id=self.instB.id) + self.assertEqual(created['case']['institution'], self.instA.id) + + def test_create_missing_scoring_rules_400(self): + payload = build_traditional_payload(department_name='儿科') + payload.pop('scoring_rules') + resp = self.sclient.post(CASE_URL, payload, format='json') + self.assertEqual(resp.status_code, 400, resp.content) + + def test_hospital_admin_cannot_create_403(self): + payload = build_traditional_payload(department_name='儿科') + resp = self.hclient.post(CASE_URL, payload, format='json') + self.assertEqual(resp.status_code, 403, resp.content) + + # ── CMS-CASE-1 列表 + 机构范围 ─────────────────────────────────────── + def test_list_scope_by_institution(self): + self._create_case(self.sclient, title='A院病例') # 超管落 instA + self._create_case(self.sclient, title='B院病例', institution_id=self.instB.id) + self.assertEqual(self.sclient.get(CASE_URL).json()['count'], 2) # 超管全部 + c_list = self.cclient.get(CASE_URL).json() # instA 内容管理员 + self.assertEqual(c_list['count'], 1) + self.assertEqual(c_list['results'][0]['title'], 'A院病例') + cb_list = get_auth_client(self.contentB).get(CASE_URL).json() # instB 内容管理员 + self.assertEqual(cb_list['count'], 1) + self.assertEqual(cb_list['results'][0]['title'], 'B院病例') + + # ── CMS-CASE-7 病例查看 + 跨机构 404 ───────────────────────────────── + def test_full_view(self): + cid = self._create_case(self.sclient)['case']['id'] + resp = self.sclient.get(full_url(cid)) + self.assertEqual(resp.status_code, 200, resp.content) + self.assertEqual(resp.json()['case']['id'], cid) + self.assertEqual(resp.json()['case']['institution_name'], '测试医院A') + + def test_content_admin_cannot_touch_other_institution_404(self): + cid = self._create_case(self.sclient, institution_id=self.instB.id)['case']['id'] + self.assertEqual(self.cclient.get(full_url(cid)).status_code, 404) + self.assertEqual(self.cclient.post(disable_url(cid)).status_code, 404) + + # ── CMS-CASE-4 编辑关联(改机构 + 科室)────────────────────────────── + def test_relations_change_institution_and_department(self): + cid = self._create_case(self.sclient)['case']['id'] + resp = self.sclient.post(relations_url(cid), + {'institution_id': self.instB.id, 'department_id': self.dept2.id}) + self.assertEqual(resp.status_code, 200, resp.content) + self.assertEqual(resp.json()['institution_name'], '测试医院B') + self.assertEqual(resp.json()['department_name'], '内科') + case = CaseBase.objects.get(id=cid) + self.assertEqual(case.institution_id, self.instB.id) + self.assertEqual(case.department_id, self.dept2.id) + + def test_relations_bad_institution_400(self): + cid = self._create_case(self.sclient)['case']['id'] + resp = self.sclient.post(relations_url(cid), {'institution_id': 999999}) + self.assertEqual(resp.status_code, 400, resp.content) + + # ── CMS-CASE-5 提交(草稿 → 正常)──────────────────────────────────── + def test_submit_draft_to_normal(self): + cid = self._create_case(self.cclient)['case']['id'] + resp = self.cclient.post(submit_url(cid)) + self.assertEqual(resp.status_code, 200, resp.content) + self.assertEqual(resp.json()['publish_status'], 1) # 正常 + self.assertEqual(self.cclient.post(submit_url(cid)).status_code, 400) # 非草稿重复提交 + + # ── CMS-CASE-6 停用(软删除)───────────────────────────────────────── + def test_disable_soft_delete(self): + cid = self._create_case(self.cclient)['case']['id'] + resp = self.cclient.post(disable_url(cid)) + self.assertEqual(resp.status_code, 200, resp.content) + self.assertFalse(CaseBase.objects.filter(id=cid).exists()) # 默认管理器过滤 + self.assertTrue(CaseBase.all_objects.get(id=cid).is_deleted) # 实际软删 + self.assertEqual(self.cclient.get(CASE_URL).json()['count'], 0) # 列表不再返回 + + # ── CMS-AUDIT-3 发布(正常 → 已发布,医院管理员)───────────────────── + def test_publish_by_hospital_admin(self): + cid = self._create_case(self.cclient)['case']['id'] + self.cclient.post(submit_url(cid)) # 草稿→正常 + resp = self.hclient.post(publish_url(cid)) # 医院管理员发布 + self.assertEqual(resp.status_code, 200, resp.content) + self.assertEqual(resp.json()['publish_status'], 2) # 已发布 + self.assertEqual(CaseBase.objects.get(id=cid).publish_status, 2) + + def test_publish_requires_normal_status_400(self): + cid = self._create_case(self.cclient)['case']['id'] # 仍是草稿(0) + self.assertEqual(self.hclient.post(publish_url(cid)).status_code, 400) + + def test_content_admin_cannot_publish_403(self): + cid = self._create_case(self.cclient)['case']['id'] + self.cclient.post(submit_url(cid)) + self.assertEqual(self.cclient.post(publish_url(cid)).status_code, 403) + + def test_super_admin_cannot_publish_403(self): + # 超级管理员不做病例审核发布 + cid = self._create_case(self.cclient)['case']['id'] + self.cclient.post(submit_url(cid)) + self.assertEqual(self.sclient.post(publish_url(cid)).status_code, 403) + + def test_audit_list_filter_normal(self): + cid = self._create_case(self.cclient)['case']['id'] + self.cclient.post(submit_url(cid)) + audit = self.hclient.get(CASE_URL, {'publish_status': 1}).json() # 待审核(正常) + self.assertEqual(audit['count'], 1) + self.assertEqual(audit['results'][0]['id'], cid) + + # ── 方法收敛:PATCH/DELETE → 405 ───────────────────────────────────── + def test_patch_delete_not_allowed(self): + cid = self._create_case(self.sclient)['case']['id'] + self.assertEqual(self.sclient.patch(f'{CASE_URL}{cid}/', {'title': 'x'}).status_code, 405) + self.assertEqual(self.sclient.delete(f'{CASE_URL}{cid}/').status_code, 405) + + # ── CMS-CASE-2 PDF 导入(mock AI)──────────────────────────────────── + @patch('apps.case.services.case_importer.extract_text_from_pdfs', + return_value='患儿,男,4岁,发热3天。') + def test_import_pdf_preview(self, _mock_pdf): + with ( + patch('apps.case.services.deepseek_client.call_deepseek', return_value=MOCK_C1_PARSE_RESULT), + patch.object(PdfParseUserThrottle, 'allow_request', return_value=True), + ): + resp = self.sclient.post(IMPORT_PDF_URL, + {'files': make_fake_pdf(), 'case_type': 'traditional'}, + format='multipart') + self.assertEqual(resp.status_code, 200, resp.content) + self.assertIn('parse_id', resp.json()) + self.assertEqual(CaseBase.objects.count(), 0) # 仅预览,不落库 + + def test_hospital_admin_cannot_import_or_ai_403(self): + with patch.object(PdfParseUserThrottle, 'allow_request', return_value=True): + self.assertEqual( + self.hclient.post(IMPORT_PDF_URL, {'case_type': 'traditional'}, format='multipart').status_code, 403) + self.assertEqual( + self.hclient.post(AI_GENERATE_URL, {'prompt': 'x', 'case_type': 'traditional'}, format='json').status_code, 403) + + # ── CMS-CASE-AI-1 AI 生成(mock AI)────────────────────────────────── + def test_ai_generate(self): + with ( + patch('apps.case.services.deepseek_client.call_deepseek', return_value=MOCK_C1_PARSE_RESULT), + patch.object(PdfParseUserThrottle, 'allow_request', return_value=True), + ): + resp = self.cclient.post(AI_GENERATE_URL, + {'prompt': '生成一个儿科发热病例', 'case_type': 'traditional'}, + format='json') + self.assertEqual(resp.status_code, 200, resp.content) + self.assertIn('parse_id', resp.json()) + self.assertEqual(CaseBase.objects.count(), 0) # 不落库 + + def test_ai_generate_missing_prompt_400(self): + with patch.object(PdfParseUserThrottle, 'allow_request', return_value=True): + resp = self.cclient.post(AI_GENERATE_URL, {'case_type': 'traditional'}, format='json') + self.assertEqual(resp.status_code, 400, resp.content) diff --git a/test/test_mobile_training_stats.py b/test/test_mobile_training_stats.py index c361863..95f7d6c 100644 --- a/test/test_mobile_training_stats.py +++ b/test/test_mobile_training_stats.py @@ -43,11 +43,11 @@ def _dim(name, score, mx): return {'dimension': name, 'score': score, 'max_score': mx} -# 两条记录的维度评分(得分率:见注释) -DIMS_98 = [_dim('信息获取', 20, 25), _dim('体格检查', 8, 10), _dim('检查决策', 9, 10), - _dim('诊断推理', 18, 20), _dim('治疗决策', 8, 10), _dim('医患沟通', 7, 10)] # 80/80/90/90/80/70 -DIMS_80 = [_dim('信息获取', 20, 25), _dim('体格检查', 8, 10), _dim('检查决策', 8, 10), - _dim('诊断推理', 16, 20), _dim('治疗决策', 8, 10), _dim('医患沟通', 8, 10)] # 80/80/80/80/80/80 +# 两条记录的维度评分(标准 5 维;检查利用→处置决策 用于验证归并)。得分率见行尾注释。 +DIMS_98 = [_dim('信息获取', 20, 25), _dim('分析推理', 18, 20), _dim('检查利用', 9, 10), + _dim('沟通人文', 7, 10), _dim('临床整合', 8, 10)] # 信息80/分析90/处置90/沟通70/整合80 +DIMS_80 = [_dim('信息获取', 20, 25), _dim('分析推理', 16, 20), _dim('处置决策', 8, 10), + _dim('沟通人文', 8, 10), _dim('临床整合', 8, 10)] # 信息80/分析80/处置80/沟通80/整合80 class TrainingStatsTest(TransactionTestCase): @@ -147,13 +147,14 @@ class TrainingStatsTest(TransactionTestCase): d = resp.json() self.assertEqual(d['current_score'], 89.0) radar = {x['dimension']: x['score'] for x in d['radar']} - self.assertEqual(set(radar), {'病史采集', '查体能力', '检查决策', '诊断能力', '治疗决策', '医患沟通'}) - self.assertEqual(radar['病史采集'], 80) # avg(80,80) - self.assertEqual(radar['检查决策'], 85) # avg(90,80) - self.assertEqual(radar['诊断能力'], 85) # 诊断推理 avg(90,80) - self.assertEqual(radar['医患沟通'], 75) # avg(70,80) → 最低 - self.assertEqual(d['weak_dimensions'], ['医患沟通']) - self.assertIn('医患沟通', d['comment']) # 强/弱不同 → 走对比文案 + self.assertEqual(set(radar), {'信息获取', '分析推理', '处置决策', '沟通人文', '临床整合'}) + self.assertEqual(radar['信息获取'], 80) # avg(80,80) + self.assertEqual(radar['分析推理'], 85) # avg(90,80) + self.assertEqual(radar['处置决策'], 85) # 检查利用90 + 处置决策80 → avg + self.assertEqual(radar['临床整合'], 80) # avg(80,80) + self.assertEqual(radar['沟通人文'], 75) # avg(70,80) → 最低 + self.assertEqual(d['weak_dimensions'], ['沟通人文']) + self.assertIn('沟通人文', d['comment']) # 强/弱不同 → 走对比文案 self.assertIn('突出', d['comment']) def test_analysis_balanced_single_record(self):