86 lines
3.9 KiB
Python
86 lines
3.9 KiB
Python
from sqlalchemy.orm import Session
|
|
|
|
from app.core.exceptions import AppError
|
|
from app.models.source_case import CaseBase
|
|
from app.repositories.case_repository import CaseRepository
|
|
from app.repositories.source_case_repository import SourceCaseRepository
|
|
from app.schemas.case import CaseDetailResponse, CaseListItem, CaseListResponse, CasePatientInfo
|
|
|
|
|
|
class CaseService:
|
|
"""病例服务:基于 case_base 新表体系提供病例列表和训练入口详情。"""
|
|
|
|
def __init__(self, db: Session) -> None:
|
|
self.db = db
|
|
self.repo = CaseRepository(db)
|
|
self.source_repo = SourceCaseRepository(db)
|
|
|
|
def list_cases(
|
|
self,
|
|
department_id: int | None = None,
|
|
training_type: str | None = None,
|
|
mode: str | None = None,
|
|
) -> CaseListResponse:
|
|
"""病例列表:从 case_base 读取已发布病例,并按模式匹配传统/教学互动扩展表。"""
|
|
cases = self.repo.list_active_cases(department_id=department_id, training_type=training_type, mode=mode)
|
|
return CaseListResponse(items=[self._to_list_item(case) for case in cases])
|
|
|
|
def get_case_detail(self, case_id: int) -> CaseDetailResponse:
|
|
"""病例详情:展示训练入口信息,不返回标准答案、隐藏病情和评分细则。"""
|
|
case = self.repo.get_active_case(case_id)
|
|
if not case:
|
|
raise AppError("CASE_NOT_FOUND", "case not found or inactive", 404)
|
|
order_items = self.repo.get_exam_items(case.id)
|
|
return CaseDetailResponse(
|
|
id=case.id,
|
|
case_code=f"SRC_{case.id}",
|
|
title=case.title,
|
|
department=self.source_repo.get_department_name(case.department_id),
|
|
difficulty=case.difficulty,
|
|
patient=CasePatientInfo(
|
|
name=None,
|
|
age=case.patient_age,
|
|
gender=case.patient_gender,
|
|
occupation=None,
|
|
),
|
|
chief_complaint=case.chief_complaint,
|
|
supported_training_type=self._training_type(case.case_type),
|
|
supported_mode=self._supported_mode(case),
|
|
has_teaching_video=self._has_video(case),
|
|
has_knowledge_points=bool(case.knowledge_points),
|
|
has_quiz=bool(case.teaching_case and case.teaching_case.discussion_questions),
|
|
order_item_types=sorted({item.item_type for item in order_items}),
|
|
)
|
|
|
|
def _to_list_item(self, case: CaseBase) -> CaseListItem:
|
|
"""病例卡片转换:把 case_base 映射为当前前端病例列表结构。"""
|
|
return CaseListItem(
|
|
id=case.id,
|
|
case_code=f"SRC_{case.id}",
|
|
department_id=case.department_id or 0,
|
|
title=case.title,
|
|
difficulty=case.difficulty,
|
|
chief_complaint=case.chief_complaint,
|
|
supported_training_type=self._training_type(case.case_type),
|
|
supported_mode=self._supported_mode(case),
|
|
has_teaching_video=self._has_video(case),
|
|
has_knowledge_points=bool(case.knowledge_points),
|
|
has_quiz=bool(case.teaching_case and case.teaching_case.discussion_questions),
|
|
)
|
|
|
|
@staticmethod
|
|
def _supported_mode(case: CaseBase) -> str:
|
|
"""模式标识:教学互动病例显示 interactive,其余显示 free_chat。"""
|
|
return "interactive" if case.teaching_case else "free_chat"
|
|
|
|
@staticmethod
|
|
def _has_video(case: CaseBase) -> bool:
|
|
"""资源标识:根据 source 表 multimodal_assets 判断是否存在视频资源。"""
|
|
assets = case.multimodal_assets or []
|
|
return any(isinstance(item, dict) and item.get("type") == "video" for item in assets)
|
|
|
|
@staticmethod
|
|
def _training_type(case_type: str) -> str:
|
|
"""训练类别兼容:源库 case_type 不在当前枚举内时按诊断治疗训练处理。"""
|
|
return case_type if case_type in {"case_analysis", "diagnosis_treatment", "consultation"} else "diagnosis_treatment"
|