Files
2026-06-12 17:19:23 +08:00

319 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from django.db import transaction
from drf_spectacular.utils import extend_schema, OpenApiResponse, inline_serializer
from rest_framework import viewsets, filters, status, serializers as drf_serializers
from rest_framework.decorators import action
from rest_framework.parsers import MultiPartParser
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from config.exceptions import AppError
from apps.user.permissions import IsCaseOperationPermitted
from apps.user.throttling import PdfParseUserThrottle, ScoringRuleGenerateUserThrottle
from .models import (
CaseBase, TraditionalCase, ScriptCase,
TeachingCase, CaseStage, ScoringRule, CaseExamItem,
)
from .serializers import (
CaseBaseListSerializer, CaseBaseDetailSerializer,
CaseBaseCreateSerializer, TraditionalCaseSerializer,
ScriptCaseSerializer, TeachingCaseSerializer,
CaseStageSerializer, ScoringRuleSerializer
)
from .services import case_importer, scoring_rule_generator
from .services.department_resolver import resolve_department
from .services.case_writer import (
create_case_full, build_full_response as _build_full_response,
validate_scoring_rules as _validate_scoring_rules,
CASE_BASE_FIELDS, TRADITIONAL_FIELDS, TEACHING_FIELDS, SCORING_RULE_FIELDS,
)
audit = logging.getLogger('audit')
class CaseBaseViewSet(viewsets.ModelViewSet):
"""病例管理"""
queryset = CaseBase.objects.all()
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = [
'case_type', 'difficulty', 'department',
'publish_status', 'status', 'osce_enabled'
]
search_fields = ['title', 'chief_complaint', 'tags', 'icd_codes']
ordering_fields = ['created_at', 'difficulty_score', 'estimated_minutes']
def get_serializer_class(self):
if self.action == 'list':
return CaseBaseListSerializer
elif self.action == 'create':
return CaseBaseCreateSerializer
return CaseBaseDetailSerializer
# ── C1: parse-pdf ────────────────────────────────────────────────────
@extend_schema(
summary='C1: PDF 解析',
description='上传 1~5 份 PDF,调用 DeepSeek 提取结构化病例数据。不落库,不含评分规则。',
request={'multipart/form-data': {'type': 'object', 'properties': {
'files': {'type': 'array', 'items': {'type': 'string', 'format': 'binary'}},
'case_type': {'type': 'string', 'enum': ['traditional', 'teaching']},
}, 'required': ['files', 'case_type']}},
responses={200: OpenApiResponse(description='解析结果(含 parse_id、data')},
tags=['病例'],
)
@action(
detail=False, methods=['post'], url_path='parse-pdf',
parser_classes=[MultiPartParser],
permission_classes=[IsCaseOperationPermitted],
throttle_classes=[PdfParseUserThrottle],
)
def parse_pdf(self, request):
"""C1: PDF 解析 → 结构化数据(不落库,不含评分规则)"""
files = request.FILES.getlist('files')
case_type = request.data.get('case_type', '')
result = case_importer.parse_pdf(files, case_type, request.user)
return Response(result)
# ── C2: generate-scoring-rules ───────────────────────────────────────
@extend_schema(
summary='C2: AI 生成评分规则预览',
description='传入病例数据 JSON,调用 DeepSeek 生成评分规则。不落库,返回规则列表供前端审核。',
request=inline_serializer('GenerateScoringRulesRequest', fields={
'case_type': drf_serializers.ChoiceField(choices=['traditional', 'teaching'], help_text='病例类型'),
'title': drf_serializers.CharField(help_text='病例标题'),
'chief_complaint': drf_serializers.CharField(required=False, help_text='主诉'),
'traditional': drf_serializers.DictField(required=False, help_text='传统病例子表(case_type=traditional 时传)'),
'teaching': drf_serializers.DictField(required=False, help_text='教学病例子表(case_type=teaching 时传)'),
}),
responses={200: OpenApiResponse(description='评分规则列表 + AI 用量信息')},
tags=['病例'],
)
@action(
detail=False, methods=['post'], url_path='generate-scoring-rules',
permission_classes=[IsCaseOperationPermitted],
throttle_classes=[ScoringRuleGenerateUserThrottle],
)
def generate_scoring_rules(self, request):
"""C2: AI 生成评分规则预览(不落库)"""
result = scoring_rule_generator.generate(request.data)
audit.info(
'CASE_SCORING_RULE_PREVIEW user=%s case_type=%s rules=%d prompt_version=%s',
request.user.id, request.data.get('case_type', ''),
len(result['scoring_rules']), result['prompt_version'],
)
return Response({
'generated': len(result['scoring_rules']),
'ai_usage': result['usage'],
'prompt_version': result['prompt_version'],
'scoring_rules': result['scoring_rules'],
})
# ── C3: full-create ──────────────────────────────────────────────────
@extend_schema(
summary='C3: 创建病例(统一落库入口)',
description='病例主表 + 子表 + scoring_rules(≥1 条)同一事务入库。scoring_rules 必填。',
request=inline_serializer('FullCreateRequest', fields={
'title': drf_serializers.CharField(help_text='病例标题'),
'case_type': drf_serializers.ChoiceField(choices=['traditional', 'teaching']),
'department_name': drf_serializers.CharField(required=False, help_text='科室名称(后端解析为 department_id'),
'traditional': drf_serializers.DictField(required=False),
'teaching': drf_serializers.DictField(required=False),
'scoring_rules': drf_serializers.ListField(child=drf_serializers.DictField(), help_text='评分规则(≥1 条,必填)'),
'exam_items': drf_serializers.ListField(
child=drf_serializers.DictField(), required=False,
help_text='检查项(可选;同一病例 item_code 不可重复)',
),
'parse_id': drf_serializers.CharField(required=False, help_text='来自 parse-pdf 的 parse_id(审计用)'),
'auto_publish': drf_serializers.BooleanField(required=False, default=False),
}),
responses={201: OpenApiResponse(description='完整病例结构(同 GET full')},
tags=['病例'],
)
@action(
detail=False, methods=['post'], url_path='full-create',
permission_classes=[IsCaseOperationPermitted],
)
def full_create(self, request):
"""C3: 创建病例(主表+子表+评分规则同一事务)"""
case, n_rules, n_items = create_case_full(request.data, request.user)
audit.info(
'CASE_CREATE case_id=%s from=%s by=%s scoring_rules=%d exam_items=%d',
case.id, request.data.get('parse_id', 'form'), request.user.id,
n_rules, n_items,
)
return Response(_build_full_response(case), status=status.HTTP_201_CREATED)
# ── C4 / C5: GET + PATCH full ───────────────────────────────────────
@extend_schema(
summary='C4: GET 完整查看 / C5: PATCH 局部编辑草稿',
description='GET: 返回主表+子表+scoring_rules。PATCH: 仅草稿可编辑,scoring_rules 传入时整体替换。',
responses={200: OpenApiResponse(description='完整病例结构')},
tags=['病例'],
)
@action(detail=True, methods=['get', 'patch'], url_path='full')
def full(self, request, pk=None):
"""C4: GET 完整查看 / C5: PATCH 局部编辑草稿"""
if request.method == 'GET':
return self._full_detail(request, pk)
return self._full_update(request, pk)
def _full_detail(self, request, pk):
case = CaseBase.objects.select_related(
'department', 'created_by'
).prefetch_related('scoring_rules', 'exam_items').filter(pk=pk).first()
if not case:
raise AppError('NOT_FOUND', '病例不存在', status_code=404)
if case.publish_status != 2: # 仅「已发布」可公开查看;草稿/正常需作者或管理员
if not request.user.is_authenticated:
raise AppError('AUTH_UNAUTHORIZED', '请先登录', status_code=401)
if not (request.user.id == case.created_by_id
or request.user.role_type in ('super_admin', 'content_admin')
or request.user.is_staff):
raise AppError('CASE_PERMISSION_DENIED', '无权查看该草稿', status_code=403)
return Response(_build_full_response(case))
def _full_update(self, request, pk):
case = CaseBase.objects.select_related('department', 'created_by').filter(pk=pk).first()
if not case:
raise AppError('NOT_FOUND', '病例不存在', status_code=404)
if not (request.user.id == case.created_by_id
or request.user.role_type in ('super_admin', 'content_admin')
or request.user.is_staff):
raise AppError('CASE_PERMISSION_DENIED', '无权编辑该病例', status_code=403)
if case.publish_status != 0:
raise AppError('CASE_NOT_EDITABLE', '仅草稿可编辑,请先下架', status_code=400)
data = request.data
with transaction.atomic():
changed = False
for field in CASE_BASE_FIELDS:
if field in data:
setattr(case, field, data[field])
changed = True
if 'department_name' in data:
case.department = resolve_department(data['department_name'])
changed = True
if changed:
case.save()
case_type = case.case_type
sub_data = data.get(case_type)
if sub_data:
sub_model = TraditionalCase if case_type == 'traditional' else TeachingCase
allowed_sub = TRADITIONAL_FIELDS if case_type == 'traditional' else TEACHING_FIELDS
sub_obj, _ = sub_model.objects.get_or_create(case=case)
for k, v in sub_data.items():
if k in allowed_sub:
setattr(sub_obj, k, v)
sub_obj.save()
scoring_rules_data = data.get('scoring_rules')
if scoring_rules_data is not None:
_validate_scoring_rules(scoring_rules_data)
case.scoring_rules.all().delete()
rule_objs = [
ScoringRule(case=case, **{k: v for k, v in rule.items() if k in SCORING_RULE_FIELDS})
for rule in scoring_rules_data
]
ScoringRule.objects.bulk_create(rule_objs)
audit.info('CASE_UPDATE case_id=%s by=%s', case.id, request.user.id)
case.refresh_from_db()
return Response(_build_full_response(case))
# ── existing actions ─────────────────────────────────────────────────
@action(detail=True, methods=['get'])
def stages(self, request, pk=None):
case = self.get_object()
serializer = CaseStageSerializer(case.stages.all(), many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def add_stage(self, request, pk=None):
case = self.get_object()
serializer = CaseStageSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save(case=case)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@action(detail=True, methods=['get'])
def scoring_rules_list(self, request, pk=None):
case = self.get_object()
serializer = ScoringRuleSerializer(case.scoring_rules.all(), many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def add_scoring_rule(self, request, pk=None):
case = self.get_object()
serializer = ScoringRuleSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save(case=case)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@action(detail=True, methods=['post'])
def publish(self, request, pk=None):
case = self.get_object()
case.publish_status = 2 # 已发布
case.save()
return Response({'message': '病例已发布'})
@action(detail=True, methods=['post'])
def unpublish(self, request, pk=None):
case = self.get_object()
case.delete() # 下架 = 软删除(is_deleted=1
return Response({'message': '病例已下架'})
class TraditionalCaseViewSet(viewsets.ModelViewSet):
queryset = TraditionalCase.objects.all()
serializer_class = TraditionalCaseSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = ['case']
class ScriptCaseViewSet(viewsets.ModelViewSet):
queryset = ScriptCase.objects.all()
serializer_class = ScriptCaseSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = ['case']
class TeachingCaseViewSet(viewsets.ModelViewSet):
queryset = TeachingCase.objects.all()
serializer_class = TeachingCaseSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = ['case']
class CaseStageViewSet(viewsets.ModelViewSet):
queryset = CaseStage.objects.all()
serializer_class = CaseStageSerializer
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
filterset_fields = ['case', 'stage_type', 'stage_mode']
ordering_fields = ['sort_order', 'created_at']
class ScoringRuleViewSet(viewsets.ModelViewSet):
queryset = ScoringRule.objects.all()
serializer_class = ScoringRuleSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = ['case', 'dimension', 'ai_auto_score', 'osce_dimension']