feat: update cms case api

This commit is contained in:
2026-06-12 17:19:23 +08:00
parent 2fab2be0a1
commit 8fecaeeb54
14 changed files with 1375 additions and 237 deletions
+229
View File
@@ -0,0 +1,229 @@
"""CMS 病例库 + AI 病例生成 + 病例审核。
挂载于 `/api/cms/cases/`,仅 GET/POSTCMS v0.6 约定:查=GET,增删改=POST)。三类角色共用:
- **超级管理员**:全平台病例,建/改/导入/AI生成/编辑关联/提交/停用;**不做病例审核发布**。
- **内容管理员**:仅本机构(`institution`)病例,可建/导入/AI生成/编辑关联/提交/停用;不审核发布。
- **医院管理员**:仅本机构病例,做**病例审核**——查看 + 发布(正常 → 已发布);不建/改。
按设计文档实现(已为 `case_base` 加 `institution` 外键、`is_deleted` 软删除):
- 数据范围以 `institution` 收口(内容/医院管理员仅本院)。
- 病例状态机 `publish_status`0 草稿 →[提交]→ 1 正常 →[医院管理员发布]→ 2 已发布。
- 停用/下架 = **软删除**`is_deleted=1`,默认管理器自动过滤);编辑关联可改所属**机构 + 科室**。
"""
import logging
from django_filters.rest_framework import DjangoFilterBackend
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse, inline_serializer
from rest_framework import viewsets, filters, status, serializers as drf_serializers
from rest_framework.decorators import action
from rest_framework.parsers import MultiPartParser
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from config.exceptions import AppError
from apps.cms.permissions import (
IsSuperContentOrHospitalAdmin, is_super, is_content_admin, is_hospital_admin,
)
from apps.user.models import Institution, Department
from apps.user.throttling import PdfParseUserThrottle
from .models import CaseBase
from .serializers import CaseBaseListSerializer
from .services import case_importer
from .services.case_writer import create_case_full, build_full_response
audit = logging.getLogger('audit')
@extend_schema_view(
list=extend_schema(summary='CMS-CASE-1 病例列表', tags=['CMS-病例']),
create=extend_schema(summary='CMS-CASE-3 表单新增病例(草稿)', tags=['CMS-病例']),
)
class CmsCaseViewSet(viewsets.ModelViewSet):
"""CMS 病例库 + 审核。超管全平台、内容/医院管理员仅本机构(`institution`)。"""
serializer_class = CaseBaseListSerializer
permission_classes = [IsAuthenticated, IsSuperContentOrHospitalAdmin]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['case_type', 'publish_status', 'department', 'institution', 'status', 'osce_enabled']
search_fields = ['title', 'chief_complaint', 'tags', 'icd_codes']
ordering_fields = ['created_at', 'difficulty_score', 'estimated_minutes']
# 仅 GET / POST:查=GET,增删改=POST(编辑关联→{id}/relations/,停用→{id}/disable/,发布→{id}/publish/
http_method_names = ['get', 'post', 'head', 'options']
def get_queryset(self):
# 默认管理器已过滤软删(is_deleted=False
qs = CaseBase.objects.select_related('institution', 'department', 'created_by').all().order_by('-created_at')
user = self.request.user
if is_super(user):
return qs
# 内容管理员 / 医院管理员:仅本机构
return qs.filter(institution_id=user.institution_id)
# ── 角色守卫 ─────────────────────────────────────────────────────────
def _require_editor(self):
"""建/改/导入/AI生成/提交/停用:仅超管或内容管理员。"""
if not (is_super(self.request.user) or is_content_admin(self.request.user)):
raise AppError('CMS_PERMISSION_DENIED', '仅超级管理员 / 内容管理员可操作病例内容', status_code=403)
def _require_publisher(self):
"""审核发布:仅医院管理员(超级管理员不做病例审核)。"""
if not is_hospital_admin(self.request.user):
raise AppError('CMS_PERMISSION_DENIED', '病例审核发布仅医院管理员可操作', status_code=403)
def _resolve_institution(self):
"""新建病例的所属机构:内容管理员强制本院;超管可传 institution_id,缺省落本院。"""
user = self.request.user
if is_content_admin(user):
return user.institution
inst_id = self.request.data.get('institution_id')
if inst_id in (None, ''):
return user.institution
inst = Institution.objects.filter(id=inst_id).first()
if not inst:
raise AppError('CASE_INSTITUTION_NOT_FOUND', f'机构 id={inst_id} 不存在', status_code=400)
return inst
# ── CMS-CASE-3 表单新增(草稿)/ CMS-CASE-1 列表 ──────────────────────
def create(self, request, *args, **kwargs):
self._require_editor()
case, n_rules, n_items = create_case_full(request.data, request.user, institution=self._resolve_institution())
audit.info('CMS_CASE_CREATE case_id=%s by=%s inst=%s scoring_rules=%d exam_items=%d',
case.id, request.user.id, case.institution_id, n_rules, n_items)
return Response(build_full_response(case), status=status.HTTP_201_CREATED)
# list 用默认实现(默认管理器已排除软删;医院管理员审核用 ?publish_status=1 取待审核)
# ── CMS-CASE-7 / CMS-AUDIT-2 病例查看(完整结构)─────────────────────
@extend_schema(summary='CMS-CASE-7 病例查看(完整结构)', tags=['CMS-病例'])
@action(detail=True, methods=['get'])
def full(self, request, pk=None):
return Response(build_full_response(self.get_object()))
# ── CMS-CASE-2 PDF 导入(解析预览,不落库)────────────────────────────
@extend_schema(
summary='CMS-CASE-2 PDF 导入(解析预览)',
request={'multipart/form-data': {'type': 'object', 'properties': {
'files': {'type': 'array', 'items': {'type': 'string', 'format': 'binary'}},
'case_type': {'type': 'string', 'enum': ['traditional', 'teaching']},
}, 'required': ['files', 'case_type']}},
responses={200: OpenApiResponse(description='解析结果(parse_id + data')},
tags=['CMS-病例'],
)
@action(detail=False, methods=['post'], url_path='import-pdf',
parser_classes=[MultiPartParser], throttle_classes=[PdfParseUserThrottle])
def import_pdf(self, request):
"""解析 PDF → 结构化病例数据(不落库;前端审核后调 CMS-CASE-3 入库)。"""
self._require_editor()
files = request.FILES.getlist('files')
case_type = request.data.get('case_type', '')
return Response(case_importer.parse_pdf(files, case_type, request.user))
# ── CMS-CASE-AI-1 AI 生成病例内容(不落库)───────────────────────────
@extend_schema(
summary='CMS-CASE-AI-1 AI 生成病例内容',
request=inline_serializer('CmsCaseAiGenerateRequest', fields={
'prompt': drf_serializers.CharField(help_text='病例长描述 prompt'),
'case_type': drf_serializers.ChoiceField(choices=['traditional', 'teaching']),
}),
responses={200: OpenApiResponse(description='生成结果(parse_id + data')},
tags=['CMS-病例'],
)
@action(detail=False, methods=['post'], url_path='ai-generate',
throttle_classes=[PdfParseUserThrottle])
def ai_generate(self, request):
"""一段长 prompt → DeepSeek 按病例模板生成内容(与 PDF 导入同构,不落库)。"""
self._require_editor()
prompt = request.data.get('prompt', '')
case_type = request.data.get('case_type', '')
return Response(case_importer.generate_from_prompt(prompt, case_type, request.user))
# ── CMS-CASE-4 编辑关联信息(改所属机构 / 科室)──────────────────────
@extend_schema(
summary='CMS-CASE-4 编辑关联信息(改所属机构 / 科室)',
request=inline_serializer('CmsCaseRelationsRequest', fields={
'institution_id': drf_serializers.IntegerField(required=False, allow_null=True),
'department_id': drf_serializers.IntegerField(required=False, allow_null=True),
'department_name': drf_serializers.CharField(required=False),
}),
tags=['CMS-病例'],
)
@action(detail=True, methods=['post'])
def relations(self, request, pk=None):
"""改病例「属于哪个机构 / 科室」,不改病例内容。"""
self._require_editor()
case = self.get_object()
data = request.data
changed = []
if 'institution_id' in data:
inst_id = data.get('institution_id')
if inst_id in (None, ''):
case.institution = None
else:
inst = Institution.objects.filter(id=inst_id).first()
if not inst:
raise AppError('CASE_INSTITUTION_NOT_FOUND', f'机构 id={inst_id} 不存在', status_code=400)
case.institution = inst
changed.append('institution')
if 'department_id' in data:
dept_id = data.get('department_id')
if dept_id in (None, ''):
case.department = None
else:
dept = Department.objects.filter(id=dept_id).first()
if not dept:
raise AppError('CASE_DEPARTMENT_NOT_FOUND', f'科室 id={dept_id} 不存在', status_code=400)
case.department = dept
changed.append('department')
elif 'department_name' in data:
from .services.department_resolver import resolve_department
case.department = resolve_department(data.get('department_name', ''))
changed.append('department')
if not changed:
raise AppError('CASE_VALIDATION_ERROR', '请提供 institution_id 或 department_id/department_name', status_code=400)
case.save(update_fields=changed + ['updated_at'])
audit.info('CMS_CASE_RELATIONS case_id=%s by=%s inst=%s dept=%s',
case.id, request.user.id, case.institution_id, case.department_id)
return Response({
'message': '关联信息已更新', 'id': case.id,
'institution': case.institution_id,
'institution_name': case.institution.name if case.institution else None,
'department': case.department_id,
'department_name': case.department.name if case.department else None,
})
# ── CMS-CASE-5 提交(草稿 → 正常)────────────────────────────────────
@extend_schema(summary='CMS-CASE-5 提交(草稿 → 正常)', tags=['CMS-病例'])
@action(detail=True, methods=['post'])
def submit(self, request, pk=None):
self._require_editor()
case = self.get_object()
if case.publish_status != 0:
raise AppError('CASE_NOT_SUBMITTABLE', '仅草稿可提交', status_code=400)
case.publish_status = 1 # 正常(待医院管理员审核发布)
case.save(update_fields=['publish_status', 'updated_at'])
audit.info('CMS_CASE_SUBMIT case_id=%s by=%s', case.id, request.user.id)
return Response({'message': '已提交', 'id': case.id, 'publish_status': case.publish_status})
# ── CMS-CASE-6 停用 / 重录(下架 = 软删除)──────────────────────────
@extend_schema(summary='CMS-CASE-6 停用 / 重录(下架)', tags=['CMS-病例'])
@action(detail=True, methods=['post'])
def disable(self, request, pk=None):
"""停用 = 下架 = 软删除(is_deleted=1),不物理删除。"""
self._require_editor()
case = self.get_object()
case.delete() # SoftDeleteModel:置 is_deleted=True, deleted_at=now
audit.info('CMS_CASE_DISABLE case_id=%s by=%s', case.id, request.user.id)
return Response({'message': '已下架', 'id': case.id})
# ── CMS-AUDIT-3 发布(正常 → 已发布,仅医院管理员审核)───────────────
@extend_schema(summary='CMS-AUDIT-3 发布(正常 → 已发布)', tags=['CMS-病例审核'])
@action(detail=True, methods=['post'])
def publish(self, request, pk=None):
"""医院管理员审核发布:正常(1) → 已发布(2)。发布后对本院移动端医学生可见。超管不做审核。"""
self._require_publisher()
case = self.get_object()
if case.publish_status != 1:
raise AppError('CASE_NOT_PUBLISHABLE', '仅「正常」状态病例可发布', status_code=400)
case.publish_status = 2 # 已发布
case.save(update_fields=['publish_status', 'updated_at'])
audit.info('CMS_CASE_PUBLISH case_id=%s by=%s', case.id, request.user.id)
return Response({'message': '已发布', 'id': case.id, 'publish_status': case.publish_status})
+11
View File
@@ -0,0 +1,11 @@
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .cms import CmsCaseViewSet
router = DefaultRouter()
router.register(r'cases', CmsCaseViewSet, basename='cms-case')
urlpatterns = [
path('', include(router.urls)),
]
@@ -0,0 +1,35 @@
# Generated by Django 5.2.14 on 2026-06-12 07:57
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('case', '0003_case_exam_item'),
('user', '0007_user_deleted_at_user_is_deleted'),
]
operations = [
migrations.AddField(
model_name='casebase',
name='deleted_at',
field=models.DateTimeField(blank=True, null=True, verbose_name='删除时间'),
),
migrations.AddField(
model_name='casebase',
name='institution',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='cases', to='user.institution', verbose_name='所属机构'),
),
migrations.AddField(
model_name='casebase',
name='is_deleted',
field=models.BooleanField(db_index=True, default=False, verbose_name='是否删除'),
),
migrations.AlterField(
model_name='casebase',
name='publish_status',
field=models.SmallIntegerField(choices=[(0, '草稿'), (1, '正常'), (2, '已发布')], default=0, verbose_name='发布状态'),
),
]
+10 -5
View File
@@ -1,10 +1,10 @@
from django.db import models
from apps.common.models import BaseModel
from apps.common.models import BaseModel, SoftDeleteModel
from apps.user.models import User
class CaseBase(BaseModel):
"""病例主表"""
class CaseBase(SoftDeleteModel):
"""病例主表(软删除:`is_deleted=1` 即「已下架」;`objects` 默认过滤已删)。"""
CASE_TYPE_CHOICES = [
('traditional', '传统病例'),
('script', '剧本病例'),
@@ -15,10 +15,11 @@ class CaseBase(BaseModel):
(0, '禁用'),
(1, '正常'),
]
# 病例状态机:草稿 →[提交]→ 正常 →[医院管理员发布]→ 已发布;下架 = 软删除(is_deleted=1
PUBLISH_STATUS_CHOICES = [
(0, '草稿'),
(1, '已发布'),
(2, '下架'),
(1, '正常'),
(2, '发布'),
]
id = models.BigAutoField(primary_key=True)
@@ -26,6 +27,10 @@ class CaseBase(BaseModel):
case_type = models.CharField('病例类型', max_length=30, choices=CASE_TYPE_CHOICES)
difficulty = models.CharField('难度', max_length=20, blank=True)
difficulty_score = models.IntegerField('AI难度评分', null=True, blank=True)
institution = models.ForeignKey(
'user.Institution', on_delete=models.SET_NULL,
null=True, blank=True, related_name='cases', verbose_name='所属机构'
)
department = models.ForeignKey(
'user.Department', on_delete=models.SET_NULL,
null=True, blank=True, verbose_name='所属科室'
+3 -1
View File
@@ -6,6 +6,7 @@ from .models import (
class CaseBaseListSerializer(serializers.ModelSerializer):
institution_name = serializers.CharField(source='institution.name', read_only=True)
department_name = serializers.CharField(source='department.name', read_only=True)
created_by_name = serializers.CharField(source='created_by.real_name', read_only=True)
@@ -13,7 +14,8 @@ class CaseBaseListSerializer(serializers.ModelSerializer):
model = CaseBase
fields = [
'id', 'title', 'case_type', 'difficulty', 'difficulty_score',
'department', 'department_name', 'chief_complaint', 'patient_age',
'institution', 'institution_name', 'department', 'department_name',
'chief_complaint', 'patient_age',
'patient_gender', 'tags', 'estimated_minutes', 'osce_enabled',
'publish_status', 'status', 'created_by_name', 'created_at', 'updated_at'
]
+48
View File
@@ -72,6 +72,54 @@ def parse_pdf(files, case_type: str, user) -> dict:
}
def generate_from_prompt(prompt: str, case_type: str, user) -> dict:
"""CMS-CASE-AI-1: 一段长 prompt → DeepSeek 按病例模板生成结构化病例内容。
产出与 parse_pdf 同构(不落库、不含评分规则),缓存 parse_id 供后续 full-create 复用。
"""
if case_type not in ('traditional', 'teaching'):
raise AppError('CASE_TYPE_NOT_SUPPORTED', f'case_type 不支持: {case_type}', status_code=400)
if not (prompt or '').strip():
raise AppError('CASE_VALIDATION_ERROR', 'prompt 必填', status_code=400)
t0 = time.time()
prompt_name = f'case_{case_type}_full'
system_prompt, prompt_version = load_prompt(prompt_name)
result = deepseek_client.call_deepseek(system_prompt, prompt)
data = result['data']
data.pop('scoring_rules', None)
data.pop('stages', None)
data['case_type'] = case_type
if 'exam_items' in data:
data['exam_items'] = normalize_exam_items(data.get('exam_items') or [])
else:
data['exam_items'] = []
_strip_unknown_fields(data)
_validate_schema(data)
parse_id = uuid.uuid4().hex[:12]
cache.set(f'parse_result:{parse_id}', json.dumps(data, ensure_ascii=False), PARSE_RESULT_TTL)
audit.info(
'CASE_AI_GENERATE user=%s parse_id=%s tokens=%s prompt_version=%s',
user.id, parse_id, result.get('usage', {}), prompt_version,
)
return {
'parse_id': parse_id,
'case_type': case_type,
'ai_usage': result.get('usage', {}),
'prompt_version': prompt_version,
'generating_seconds': round(time.time() - t0, 1),
'data': data,
}
_SCHEMA_ALLOWED_KEYS = {
'title', 'case_type', 'difficulty', 'chief_complaint', 'description',
'patient_age', 'patient_gender', 'tags', 'symptom_tags', 'disease_tags',
+216
View File
@@ -0,0 +1,216 @@
"""病例落库 / 完整结构组装的共享逻辑。
`/api/case/cases/full-create`(业务域)与 `/api/cms/cases/`CMS 内容管理)共用同一套
创建与序列化逻辑,避免两处实现漂移。
"""
from django.db import transaction
from config.exceptions import AppError
from ..models import (
CaseBase, TraditionalCase, TeachingCase, ScoringRule, CaseExamItem,
)
from .department_resolver import resolve_department
from .exam_items import (
normalize_exam_items, assert_no_duplicate_exam_items, EXAM_ITEM_FIELDS,
)
TRADITIONAL_FIELDS = {'standard_diagnosis', 'standard_treatment', 'guideline_reference'}
TEACHING_FIELDS = {'teaching_goal', 'discussion_questions', 'teacher_guide', 'scoring_focus'}
SCORING_RULE_FIELDS = {
'dimension', 'competency_dimension', 'score_weight',
'ai_auto_score', 'osce_dimension', 'scoring_standard', 'rubric_json',
}
CASE_BASE_FIELDS = {
'title', 'case_type', 'difficulty', 'difficulty_score',
'chief_complaint', 'description', 'patient_age', 'patient_gender',
'tags', 'symptom_tags', 'disease_tags', 'competency_tags',
'guideline_tags', 'knowledge_points', 'icd_codes',
'estimated_minutes', 'osce_enabled', 'rag_enabled',
'ai_prompt_template', 'multimodal_assets',
}
def validate_scoring_rules(rules):
if not isinstance(rules, list):
raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必须为数组', status_code=400)
for i, rule in enumerate(rules):
if not isinstance(rule, dict):
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}] 必须为对象', status_code=400)
if not rule.get('dimension'):
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].dimension 必填', status_code=400)
weight = rule.get('score_weight')
if weight is not None:
try:
weight = float(weight)
except (TypeError, ValueError):
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须为数字', status_code=400)
if weight <= 0 or weight > 1:
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须在 (0, 1]', status_code=400)
_UNSET = object()
def create_case_full(data, user, institution=_UNSET):
"""主表 + 子表 + scoring_rules(≥1) + exam_items 同一事务入库。
institution:病例所属机构;默认取创建者所属机构(`user.institution`)。
返回 (case, scoring_rule_count, exam_item_count)。校验失败抛 AppError。
"""
if institution is _UNSET:
institution = getattr(user, 'institution', None)
case_type = data.get('case_type', '')
if case_type not in ('traditional', 'teaching'):
raise AppError('CASE_TYPE_NOT_SUPPORTED', f'case_type 不支持: {case_type}', status_code=400)
if 'stages' in data:
raise AppError('CASE_FIELD_NOT_ALLOWED', '本接口不接收 stages 字段', status_code=400)
sub_data = data.get(case_type)
other_type = 'teaching' if case_type == 'traditional' else 'traditional'
if not sub_data:
raise AppError('CASE_SUBTYPE_REQUIRED', f'{case_type} 子表数据缺失', status_code=400)
if data.get(other_type):
raise AppError('CASE_SUBTYPE_CONFLICT', '不允许同时传入两种子表数据', status_code=400)
scoring_rules_data = data.get('scoring_rules', [])
if not scoring_rules_data:
raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必填且至少 1 条', status_code=400)
validate_scoring_rules(scoring_rules_data)
exam_items_data = normalize_exam_items(data.get('exam_items') or [])
assert_no_duplicate_exam_items(exam_items_data)
department = resolve_department(data.get('department_name', ''))
with transaction.atomic():
case_kwargs = {k: data[k] for k in CASE_BASE_FIELDS if k in data}
case_kwargs['department'] = department
case_kwargs['institution'] = institution
case_kwargs['created_by'] = user
case_kwargs['status'] = 1
case_kwargs['vector_status'] = 0
case_kwargs['publish_status'] = 1 if data.get('auto_publish') else 0
case = CaseBase.objects.create(**case_kwargs)
sub_model = TraditionalCase if case_type == 'traditional' else TeachingCase
allowed_sub = TRADITIONAL_FIELDS if case_type == 'traditional' else TEACHING_FIELDS
sub_model.objects.create(case=case, **{k: v for k, v in sub_data.items() if k in allowed_sub})
rule_objs = [
ScoringRule(case=case, **{k: v for k, v in rule.items() if k in SCORING_RULE_FIELDS})
for rule in scoring_rules_data
]
ScoringRule.objects.bulk_create(rule_objs)
if exam_items_data:
exam_objs = [
CaseExamItem(case=case, **{k: v for k, v in item.items() if k in EXAM_ITEM_FIELDS})
for item in exam_items_data
]
CaseExamItem.objects.bulk_create(exam_objs)
return case, len(rule_objs), len(exam_items_data)
def build_full_response(case):
"""组装病例完整结构(主表 + 子表 + scoring_rules + exam_items)。"""
if hasattr(case, '_prefetched_objects_cache') and 'exam_items' in getattr(
case, '_prefetched_objects_cache', {}
):
exam_qs = case.exam_items.all()
else:
exam_qs = CaseExamItem.objects.filter(case_id=case.id).order_by('display_order', 'id')
result = {
'case': {
'id': case.id,
'title': case.title,
'case_type': case.case_type,
'difficulty': case.difficulty,
'difficulty_score': case.difficulty_score,
'institution': case.institution_id,
'institution_name': case.institution.name if case.institution else None,
'department': case.department_id,
'department_name': case.department.name if case.department else None,
'chief_complaint': case.chief_complaint,
'description': case.description,
'patient_age': case.patient_age,
'patient_gender': case.patient_gender,
'tags': case.tags,
'symptom_tags': case.symptom_tags,
'disease_tags': case.disease_tags,
'competency_tags': case.competency_tags,
'guideline_tags': case.guideline_tags,
'knowledge_points': case.knowledge_points,
'icd_codes': case.icd_codes,
'estimated_minutes': case.estimated_minutes,
'osce_enabled': case.osce_enabled,
'rag_enabled': case.rag_enabled,
'ai_prompt_template': case.ai_prompt_template,
'multimodal_assets': case.multimodal_assets,
'vector_status': case.vector_status,
'publish_status': case.publish_status,
'status': case.status,
'created_by': case.created_by_id,
'created_by_name': case.created_by.real_name if case.created_by else None,
'created_at': case.created_at.isoformat() if case.created_at else None,
'updated_at': case.updated_at.isoformat() if case.updated_at else None,
},
}
if case.case_type == 'traditional':
try:
tc = case.traditionalcase
result['traditional'] = {
'standard_diagnosis': tc.standard_diagnosis,
'standard_treatment': tc.standard_treatment,
'guideline_reference': tc.guideline_reference,
}
except TraditionalCase.DoesNotExist:
result['traditional'] = None
elif case.case_type == 'teaching':
try:
tc = case.teachingcase
result['teaching'] = {
'teaching_goal': tc.teaching_goal,
'discussion_questions': tc.discussion_questions,
'teacher_guide': tc.teacher_guide,
'scoring_focus': tc.scoring_focus,
}
except TeachingCase.DoesNotExist:
result['teaching'] = None
rules = case.scoring_rules.all().order_by('id')
result['scoring_rules'] = [
{
'id': r.id,
'dimension': r.dimension,
'competency_dimension': r.competency_dimension,
'score_weight': float(r.score_weight),
'ai_auto_score': r.ai_auto_score,
'osce_dimension': r.osce_dimension,
'scoring_standard': r.scoring_standard,
'rubric_json': r.rubric_json,
}
for r in rules
]
result['exam_items'] = [
{
'id': e.id,
'item_code': e.item_code,
'item_name': e.item_name,
'item_type': e.item_type,
'category': e.category,
'result_text': e.result_text,
'result_structured': e.result_structured,
'is_key': e.is_key,
'is_abnormal': e.is_abnormal,
'score_weight': float(e.score_weight),
'display_order': e.display_order,
}
for e in exam_qs
]
return result
+10 -196
View File
@@ -23,28 +23,14 @@ from .serializers import (
)
from .services import case_importer, scoring_rule_generator
from .services.department_resolver import resolve_department
from .services.exam_items import (
normalize_exam_items, assert_no_duplicate_exam_items, EXAM_ITEM_FIELDS,
from .services.case_writer import (
create_case_full, build_full_response as _build_full_response,
validate_scoring_rules as _validate_scoring_rules,
CASE_BASE_FIELDS, TRADITIONAL_FIELDS, TEACHING_FIELDS, SCORING_RULE_FIELDS,
)
audit = logging.getLogger('audit')
TRADITIONAL_FIELDS = {'standard_diagnosis', 'standard_treatment', 'guideline_reference'}
TEACHING_FIELDS = {'teaching_goal', 'discussion_questions', 'teacher_guide', 'scoring_focus'}
SCORING_RULE_FIELDS = {
'dimension', 'competency_dimension', 'score_weight',
'ai_auto_score', 'osce_dimension', 'scoring_standard', 'rubric_json',
}
CASE_BASE_FIELDS = {
'title', 'case_type', 'difficulty', 'difficulty_score',
'chief_complaint', 'description', 'patient_age', 'patient_gender',
'tags', 'symptom_tags', 'disease_tags', 'competency_tags',
'guideline_tags', 'knowledge_points', 'icd_codes',
'estimated_minutes', 'osce_enabled', 'rag_enabled',
'ai_prompt_template', 'multimodal_assets',
}
class CaseBaseViewSet(viewsets.ModelViewSet):
"""病例管理"""
@@ -154,65 +140,12 @@ class CaseBaseViewSet(viewsets.ModelViewSet):
)
def full_create(self, request):
"""C3: 创建病例(主表+子表+评分规则同一事务)"""
data = request.data
case_type = data.get('case_type', '')
if case_type not in ('traditional', 'teaching'):
raise AppError('CASE_TYPE_NOT_SUPPORTED', f'case_type 不支持: {case_type}', status_code=400)
if 'stages' in data:
raise AppError('CASE_FIELD_NOT_ALLOWED', '本接口不接收 stages 字段', status_code=400)
sub_data = data.get(case_type)
other_type = 'teaching' if case_type == 'traditional' else 'traditional'
if not sub_data:
raise AppError('CASE_SUBTYPE_REQUIRED', f'{case_type} 子表数据缺失', status_code=400)
if data.get(other_type):
raise AppError('CASE_SUBTYPE_CONFLICT', '不允许同时传入两种子表数据', status_code=400)
scoring_rules_data = data.get('scoring_rules', [])
if not scoring_rules_data:
raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必填且至少 1 条', status_code=400)
_validate_scoring_rules(scoring_rules_data)
exam_items_data = normalize_exam_items(data.get('exam_items') or [])
assert_no_duplicate_exam_items(exam_items_data)
department = resolve_department(data.get('department_name', ''))
with transaction.atomic():
case_kwargs = {k: data[k] for k in CASE_BASE_FIELDS if k in data}
case_kwargs['department'] = department
case_kwargs['created_by'] = request.user
case_kwargs['status'] = 1
case_kwargs['vector_status'] = 0
case_kwargs['publish_status'] = 1 if data.get('auto_publish') else 0
case = CaseBase.objects.create(**case_kwargs)
sub_model = TraditionalCase if case_type == 'traditional' else TeachingCase
allowed_sub = TRADITIONAL_FIELDS if case_type == 'traditional' else TEACHING_FIELDS
sub_model.objects.create(case=case, **{k: v for k, v in sub_data.items() if k in allowed_sub})
rule_objs = [
ScoringRule(case=case, **{k: v for k, v in rule.items() if k in SCORING_RULE_FIELDS})
for rule in scoring_rules_data
]
ScoringRule.objects.bulk_create(rule_objs)
if exam_items_data:
exam_objs = [
CaseExamItem(
case=case,
**{k: v for k, v in item.items() if k in EXAM_ITEM_FIELDS},
)
for item in exam_items_data
]
CaseExamItem.objects.bulk_create(exam_objs)
case, n_rules, n_items = create_case_full(request.data, request.user)
audit.info(
'CASE_CREATE case_id=%s from=%s by=%s scoring_rules=%d exam_items=%d',
case.id, data.get('parse_id', 'form'), request.user.id,
len(rule_objs), len(exam_items_data),
case.id, request.data.get('parse_id', 'form'), request.user.id,
n_rules, n_items,
)
return Response(_build_full_response(case), status=status.HTTP_201_CREATED)
@@ -240,7 +173,7 @@ class CaseBaseViewSet(viewsets.ModelViewSet):
if not case:
raise AppError('NOT_FOUND', '病例不存在', status_code=404)
if case.publish_status != 1:
if case.publish_status != 2: # 仅「已发布」可公开查看;草稿/正常需作者或管理员
if not request.user.is_authenticated:
raise AppError('AUTH_UNAUTHORIZED', '请先登录', status_code=401)
if not (request.user.id == case.created_by_id
@@ -336,15 +269,14 @@ class CaseBaseViewSet(viewsets.ModelViewSet):
@action(detail=True, methods=['post'])
def publish(self, request, pk=None):
case = self.get_object()
case.publish_status = 1
case.publish_status = 2 # 已发布
case.save()
return Response({'message': '病例已发布'})
@action(detail=True, methods=['post'])
def unpublish(self, request, pk=None):
case = self.get_object()
case.publish_status = 2
case.save()
case.delete() # 下架 = 软删除(is_deleted=1
return Response({'message': '病例已下架'})
@@ -384,121 +316,3 @@ class ScoringRuleViewSet(viewsets.ModelViewSet):
filterset_fields = ['case', 'dimension', 'ai_auto_score', 'osce_dimension']
# ── helpers ──────────────────────────────────────────────────────────────
def _validate_scoring_rules(rules):
if not isinstance(rules, list):
raise AppError('CASE_VALIDATION_ERROR', 'scoring_rules 必须为数组', status_code=400)
for i, rule in enumerate(rules):
if not isinstance(rule, dict):
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}] 必须为对象', status_code=400)
if not rule.get('dimension'):
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].dimension 必填', status_code=400)
weight = rule.get('score_weight')
if weight is not None:
try:
weight = float(weight)
except (TypeError, ValueError):
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须为数字', status_code=400)
if weight <= 0 or weight > 1:
raise AppError('CASE_VALIDATION_ERROR', f'scoring_rules[{i}].score_weight 须在 (0, 1]', status_code=400)
def _build_full_response(case):
if hasattr(case, '_prefetched_objects_cache') and 'exam_items' in getattr(
case, '_prefetched_objects_cache', {}
):
exam_qs = case.exam_items.all()
else:
exam_qs = CaseExamItem.objects.filter(case_id=case.id).order_by('display_order', 'id')
result = {
'case': {
'id': case.id,
'title': case.title,
'case_type': case.case_type,
'difficulty': case.difficulty,
'difficulty_score': case.difficulty_score,
'department': case.department_id,
'department_name': case.department.name if case.department else None,
'chief_complaint': case.chief_complaint,
'description': case.description,
'patient_age': case.patient_age,
'patient_gender': case.patient_gender,
'tags': case.tags,
'symptom_tags': case.symptom_tags,
'disease_tags': case.disease_tags,
'competency_tags': case.competency_tags,
'guideline_tags': case.guideline_tags,
'knowledge_points': case.knowledge_points,
'icd_codes': case.icd_codes,
'estimated_minutes': case.estimated_minutes,
'osce_enabled': case.osce_enabled,
'rag_enabled': case.rag_enabled,
'ai_prompt_template': case.ai_prompt_template,
'multimodal_assets': case.multimodal_assets,
'vector_status': case.vector_status,
'publish_status': case.publish_status,
'status': case.status,
'created_by': case.created_by_id,
'created_by_name': case.created_by.real_name if case.created_by else None,
'created_at': case.created_at.isoformat() if case.created_at else None,
'updated_at': case.updated_at.isoformat() if case.updated_at else None,
},
}
if case.case_type == 'traditional':
try:
tc = case.traditionalcase
result['traditional'] = {
'standard_diagnosis': tc.standard_diagnosis,
'standard_treatment': tc.standard_treatment,
'guideline_reference': tc.guideline_reference,
}
except TraditionalCase.DoesNotExist:
result['traditional'] = None
elif case.case_type == 'teaching':
try:
tc = case.teachingcase
result['teaching'] = {
'teaching_goal': tc.teaching_goal,
'discussion_questions': tc.discussion_questions,
'teacher_guide': tc.teacher_guide,
'scoring_focus': tc.scoring_focus,
}
except TeachingCase.DoesNotExist:
result['teaching'] = None
rules = case.scoring_rules.all().order_by('id')
result['scoring_rules'] = [
{
'id': r.id,
'dimension': r.dimension,
'competency_dimension': r.competency_dimension,
'score_weight': float(r.score_weight),
'ai_auto_score': r.ai_auto_score,
'osce_dimension': r.osce_dimension,
'scoring_standard': r.scoring_standard,
'rubric_json': r.rubric_json,
}
for r in rules
]
result['exam_items'] = [
{
'id': e.id,
'item_code': e.item_code,
'item_name': e.item_name,
'item_type': e.item_type,
'category': e.category,
'result_text': e.result_text,
'result_structured': e.result_structured,
'is_key': e.is_key,
'is_abnormal': e.is_abnormal,
'score_weight': float(e.score_weight),
'display_order': e.display_order,
}
for e in exam_qs
]
return result
+26
View File
@@ -37,6 +37,32 @@ class IsSuperOrHospitalAdmin(BasePermission):
raise AppError('CMS_PERMISSION_DENIED', '需要超级管理员或医院管理员权限', status_code=403)
def is_content_admin(user):
return getattr(user, 'role_type', '') == 'content_admin'
def is_hospital_admin(user):
return getattr(user, 'role_type', '') == 'hospital_admin'
class IsSuperContentOrHospitalAdmin(BasePermission):
"""超级管理员 / 内容管理员 / 医院管理员可访问(病例库 + AI 生成 + 病例审核)。
数据范围(超管全平台;内容/医院管理员仅本机构 `institution`)由 ViewSet 的
get_queryset 收口;各动作的角色权限(建/改归内容管理员,审核发布归医院管理员)
在 ViewSet 内逐动作判定。
"""
def has_permission(self, request, view):
user = request.user
if user and user.is_authenticated and (
is_super(user) or is_content_admin(user) or is_hospital_admin(user)
):
return True
raise AppError('CMS_PERMISSION_DENIED',
'需要超级管理员 / 内容管理员 / 医院管理员权限', status_code=403)
class IsTeacher(BasePermission):
"""仅带教医生(role_type=doctor)可访问。
+1
View File
@@ -5,4 +5,5 @@ from django.urls import path, include
urlpatterns = [
path('', include('apps.organization.urls')), # 机构、科室
path('', include('apps.user.cms_urls')), # 用户
path('', include('apps.case.cms_urls')), # 病例库 + AI 病例生成
]
+21 -23
View File
@@ -21,32 +21,30 @@ from drf_spectacular.utils import extend_schema
from apps.training.models import TrainingRecord
# 临床核心能力标准 6 维(A 组
STANDARD_DIMS = ['病史采集', '查体能力', '检查决策', '诊断能力', '治疗决策', '医患沟通']
# 临床胜任力标准 5 维(与 training_score_detail / ai_feedback_structured 的评分维度一致
STANDARD_DIMS = ['信息获取', '分析推理', '处置决策', '沟通人文', '临床整合']
# 评分维度(fastapi 实际打分维度,名称随病例评分规则而变)→ 标准 6 维 的归并映射。
# ⚠️ 数据中维度名不统一(已观察到两套来源、且随病例不同),此映射为按实测维度的最佳归并,
# 最终口径需与评分/内容团队对齐。未在映射内的维度会被忽略。
# 评分维度(fastapi 实际打分维度,名称随病例评分规则而变)→ 标准 5 维 的归并映射。
# 维度名不完全统一(实测有 检查利用/临床推理/人文沟通 等同义写法),统一归并到 5 个标准维度。
# 未在映射内的维度会被忽略。
DIMENSION_MAP = {
# 病史采集
'信息获取': '病史采集', '病史采集': '病史采集', '问诊': '病史采集',
# 查体能力(注:当前 ai_feedback 数据未单列查体维度,多由内容侧 rubric 决定
'查体能力': '查体能力', '体格检查': '查体能力', '查体': '查体能力',
# 检查决策
'检查决策': '检查决策', '检查利用': '检查决策', '检查理解': '检查决策',
'辅助检查': '检查决策', '决策': '检查决策',
# 诊断能力
'诊断推理': '诊断能力', '诊断能力': '诊断能力', '鉴别诊断': '诊断能力',
'分析推理': '诊断能力', '临床推理': '诊断能力', '临床整合': '诊断能力',
'知识掌握': '诊断能力', '知识运用': '诊断能力',
# 治疗决策
'治疗决策': '治疗决策', '处置决策': '治疗决策', '治疗': '治疗决策',
# 医患沟通
'沟通技巧': '医患沟通', '医患沟通': '医患沟通',
'沟通人文': '医患沟通', '人文沟通': '医患沟通', '沟通': '医患沟通',
# 信息获取(病史/问诊)
'信息获取': '信息获取', '病史采集': '信息获取', '问诊': '信息获取',
# 分析推理(诊断/鉴别/推理
'分析推理': '分析推理', '临床推理': '分析推理', '诊断推理': '分析推理',
'鉴别诊断': '分析推理', '诊断能力': '分析推理',
# 处置决策(检查决策 + 治疗决策合并)
'处置决策': '处置决策', '查利用': '处置决策', '检查理解': '处置决策',
'检查决策': '处置决策', '辅助检查': '处置决策', '检验决策': '处置决策',
'治疗决策': '处置决策', '治疗': '处置决策',
# 沟通人文
'沟通人文': '沟通人文', '人文沟通': '沟通人文', '沟通技巧': '沟通人文',
'医患沟通': '沟通人文', '沟通': '沟通人文',
# 临床整合(含知识掌握/运用)
'临床整合': '临床整合', '知识掌握': '临床整合', '知识运用': '临床整合',
}
# 诊断相关维度(诊断准确率口径:这些维度的平均得分率)
DIAGNOSIS_DIMS = {'诊断推理', '诊断能力', '鉴别诊断', '分析推理', '临床推理', '临床整合'}
# 诊断相关维度(诊断准确率口径:这些维度的平均得分率;与归并到「分析推理」的来源一致
DIAGNOSIS_DIMS = {'诊断推理', '诊断能力', '鉴别诊断', '分析推理', '临床推理'}
def _completed_qs(user):