Files
fastapi/app/services/teaching_service.py
2026-06-08 16:49:45 +08:00

358 lines
17 KiB
Python

from __future__ import annotations
import json
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy.orm import Session
from app.agents.orchestrator import MedicalConsultationOrchestrator
from app.core.context import UserContext
from app.core.exceptions import AppError
from app.models.training import TrainingSession
from app.models.training_record import TrainingRecord
from app.repositories.evaluation_repository import EvaluationRepository
from app.repositories.source_case_repository import SourceCaseRepository
from app.repositories.teaching_repository import TeachingRepository
from app.schemas.teaching import (
CreateTeachingEvaluationRequest,
TeachingCaseSummary,
TeachingEvaluationResponse,
TeachingItemsResponse,
TeachingOption,
TeachingQuestion,
TeachingVideo,
)
from app.services.audit_service import AuditService
from app.services.evaluation_service import EvaluationService
from app.services.knowledge_service import KnowledgeService
class TeachingService:
"""教学互动服务:读取教学题目、提交作答并生成教学互动评价。"""
def __init__(self, db: Session) -> None:
self.db = db
self.repo = TeachingRepository(db)
self.source_repo = SourceCaseRepository(db)
self.eval_repo = EvaluationRepository(db)
self.audit = AuditService(db)
self.knowledge = KnowledgeService(db)
self.orchestrator = MedicalConsultationOrchestrator()
self.evaluation_service = EvaluationService(db)
def list_items(self, ctx: UserContext, case_id: int) -> TeachingItemsResponse:
"""教学列表:读取 case_base + teaching_case 并返回题目、选项、答案、解析和视频。"""
case = self.repo.get_active_teaching_case(case_id)
if not case:
raise AppError("TEACHING_CASE_NOT_FOUND", "teaching case not found or inactive", 404)
teaching = case.teaching_case
questions = self._parse_questions(case)
self.audit.log(ctx, "teaching.items", "case_base", str(case.id), None)
return TeachingItemsResponse(
case=TeachingCaseSummary(
case_id=case.id,
title=case.title,
department_id=case.department_id,
difficulty=case.difficulty,
chief_complaint=case.chief_complaint,
description=case.description,
patient_age=case.patient_age,
patient_gender=case.patient_gender,
knowledge_points=case.knowledge_points or [],
),
teaching_goal=teaching.teaching_goal,
teacher_guide=teaching.teacher_guide,
scoring_focus=teaching.scoring_focus,
questions=questions,
)
async def create_evaluation(self, ctx: UserContext, payload: CreateTeachingEvaluationRequest) -> TeachingEvaluationResponse:
"""教学评价:校验作答后调用 LLM 评分,并写入 training_record 与评分明细。"""
case = self.repo.get_active_teaching_case(payload.case_id)
if not case:
raise AppError("TEACHING_CASE_NOT_FOUND", "teaching case not found or inactive", 404)
teaching = case.teaching_case
questions = self._parse_questions(case)
if not questions:
raise AppError("TEACHING_QUESTION_EMPTY", "teaching questions are empty", 400)
answer_results = self._build_answer_results(questions, payload.answers)
session = self._create_teaching_session(ctx, case.id, payload.score_type, answer_results)
guideline_result = self.knowledge.search_guidelines(
case.department_id or 0,
case.case_type,
(case.knowledge_points or []) + (case.key_points or []),
)
scoring_rules = self.source_repo.get_scoring_rules(case.id)
teaching_payload = {
"teaching_goal": teaching.teaching_goal,
"teacher_guide": teaching.teacher_guide,
"scoring_focus": teaching.scoring_focus,
"questions": [item.model_dump() for item in questions],
"student_answers": [item.model_dump() for item in payload.answers],
"answer_results": answer_results,
"correct_count": sum(1 for item in answer_results if item["is_correct"]),
"total_count": len(answer_results),
}
report = await self.orchestrator.evaluate_teaching(
case=case,
teaching_payload=teaching_payload,
scoring_rules=scoring_rules,
guideline_refs=guideline_result["source_refs"],
score_type=payload.score_type,
)
record = self._build_training_record(ctx, session, case, teaching_payload, report, scoring_rules, guideline_result)
self.eval_repo.create_record(record)
score_details = self.evaluation_service._build_score_details(record.id, report, scoring_rules)
self.eval_repo.replace_score_details(record.id, score_details)
self.audit.log(ctx, "teaching.evaluation.generate", "training_record", str(record.id), session.id)
response = self.evaluation_service._to_response(record)
return TeachingEvaluationResponse(session_id=session.id, **response.model_dump())
def _create_teaching_session(self, ctx: UserContext, case_id: int, score_type: str, answer_results: list[dict]) -> TrainingSession:
"""教学会话:创建轻量 session 以复用评价详情、历史记录和 PDF 能力。"""
now = datetime.utcnow()
session_code = f"teach_{now.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}"
session = TrainingSession(
session_code=session_code,
user_id=ctx.user_id,
tenant_id=ctx.tenant_id,
class_id=ctx.class_id,
entry_scene=ctx.entry_scene,
case_id=case_id,
training_type="teaching_interaction",
mode="teaching",
score_type=score_type,
status="completed",
started_at=now,
completed_at=now,
memory_key=None,
metadata_={"source": "teaching_interaction", "answer_results": answer_results},
)
self.db.add(session)
self.db.flush()
return session
def _build_training_record(
self,
ctx: UserContext,
session: TrainingSession,
case,
teaching_payload: dict,
report: dict,
scoring_rules: list,
guideline_result: dict,
) -> TrainingRecord:
"""教学记录:把教学互动评价沉淀为 training_record。"""
total_score = float(report.get("total_score") or 0)
structured = {
"score_type": report.get("score_type", session.score_type),
"total_score": total_score,
"dimension_scores": report.get("dimension_scores") or [],
"score_details": report.get("score_details") or [],
"errors": report.get("errors") or [],
"improvement_plan": report.get("improvement_plan") or [],
"evidence_summary": report.get("evidence_summary") or [],
"guideline_refs": report.get("guideline_refs") or [],
"overall_comment": report.get("overall_comment") or "",
"llm_model": report.get("_llm_model"),
"latency_metrics": report.get("_latency_metrics") or {},
"teaching_summary": {
"correct_count": teaching_payload.get("correct_count", 0),
"total_count": teaching_payload.get("total_count", 0),
},
}
return TrainingRecord(
training_mode="teaching",
case_type="teaching_interaction",
start_time=session.started_at or datetime.utcnow(),
end_time=session.completed_at or datetime.utcnow(),
duration_seconds=0,
total_score=total_score,
ai_score=total_score,
teacher_score=None,
evaluation_level=self.evaluation_service._evaluation_level(total_score, structured["score_type"]),
status="completed",
feedback=structured["overall_comment"],
thinking_chain=json.dumps(
{
"teaching_goal": teaching_payload.get("teaching_goal", ""),
"scoring_focus": teaching_payload.get("scoring_focus", ""),
"answer_results": teaching_payload.get("answer_results", []),
"scoring_rule_count": len(scoring_rules),
"guideline_refs": structured["guideline_refs"],
},
ensure_ascii=False,
),
diagnosis_path=json.dumps(
{
"case_title": case.title,
"question_count": teaching_payload.get("total_count", 0),
"correct_count": teaching_payload.get("correct_count", 0),
},
ensure_ascii=False,
),
wrong_points=structured["errors"],
missed_questions=[],
recommendation_result={"improvement_plan": structured["improvement_plan"]},
ai_feedback_structured=structured,
osce_station_score={},
interruption_count=0,
emotion_analysis={},
prompt_version="teaching_interaction_v1",
rag_context_version=self.evaluation_service._rag_context_version(guideline_result),
case_id=case.id,
teacher_id=None,
user_id=self.evaluation_service._numeric_user_id(ctx.user_id),
external_user_id=ctx.user_id,
session_id=session.id,
evaluation_record_id=None,
score_type=structured["score_type"],
pdf_file_path=None,
)
def _build_answer_results(self, questions: list[TeachingQuestion], answers: list) -> list[dict]:
"""作答校验:按 question_id 对比标准答案并生成评分证据。"""
question_map = {item.question_id: item for item in questions}
results: list[dict] = []
for answer in answers:
question = question_map.get(answer.question_id)
if not question:
raise AppError("TEACHING_QUESTION_NOT_FOUND", f"question {answer.question_id} not found", 404)
selected = self._normalize_answer(answer.selected_answer)
expected = self._normalize_answer(question.answer)
results.append(
{
"question_id": question.question_id,
"stem": question.stem,
"selected_answer": selected,
"correct_answer": expected,
"is_correct": selected == expected,
"analysis": question.analysis,
"knowledge_points": question.knowledge_points,
}
)
return results
def _parse_questions(self, case) -> list[TeachingQuestion]:
"""题目解析:从 teaching_case.discussion_questions 解析 JSON,失败时返回病例默认题目。"""
raw = case.teaching_case.discussion_questions if case.teaching_case else ""
payload: Any
try:
payload = json.loads(raw)
except (TypeError, json.JSONDecodeError):
payload = self._fallback_questions(case)
if isinstance(payload, dict):
payload = payload.get("questions") or []
if not isinstance(payload, list) or not payload:
payload = self._fallback_questions(case)
return [self._normalize_question(item, index) for index, item in enumerate(payload, start=1)]
def _normalize_question(self, item: dict, index: int) -> TeachingQuestion:
"""题目结构归一:补齐选项、答案、解析、视频等字段。"""
options = item.get("options") or []
normalized_options = [
TeachingOption(key=str(option.get("key") or ""), text=str(option.get("text") or ""))
for option in options
if isinstance(option, dict)
]
video = item.get("video")
return TeachingQuestion(
question_id=str(item.get("question_id") or f"q{index}"),
question_type=str(item.get("question_type") or "single_choice"),
stem=str(item.get("stem") or item.get("question") or ""),
options=normalized_options,
answer=item.get("answer") or "",
analysis=str(item.get("analysis") or ""),
video=TeachingVideo(**video) if isinstance(video, dict) else None,
knowledge_points=[str(value) for value in (item.get("knowledge_points") or [])],
)
def _normalize_answer(self, value: str | list[str]) -> list[str]:
"""答案归一:把单选和多选统一为排序后的大写字符串数组。"""
raw_values = value if isinstance(value, list) else [value]
return sorted(str(item).strip().upper() for item in raw_values if str(item).strip())
def _fallback_questions(self, case) -> list[dict]:
"""默认题目:当数据库暂未维护结构化题库时,为儿科肺炎 demo 生成可测试题目。"""
video = {"title": "儿童肺炎教学示例视频", "url": ""}
return [
{
"question_id": "q1",
"question_type": "single_choice",
"stem": "该患儿最需要优先关注的病情严重程度指标是?",
"options": [
{"key": "A", "text": "体温峰值"},
{"key": "B", "text": "血氧饱和度"},
{"key": "C", "text": "咳嗽天数"},
{"key": "D", "text": "食欲下降"},
],
"answer": "B",
"analysis": "血氧饱和度能帮助判断低氧和肺炎严重程度,是处置决策的重要依据。",
"video": video,
"knowledge_points": ["严重程度评估", "血氧判断"],
},
{
"question_id": "q2",
"question_type": "single_choice",
"stem": "结合发热、咳嗽、喘息和肺部湿啰音,最符合的诊断方向是?",
"options": [
{"key": "A", "text": "支气管肺炎"},
{"key": "B", "text": "急性胃肠炎"},
{"key": "C", "text": "泌尿系感染"},
{"key": "D", "text": "单纯过敏性鼻炎"},
],
"answer": "A",
"analysis": "呼吸道症状、肺部体征和影像/炎症指标共同支持儿童支气管肺炎。",
"video": video,
"knowledge_points": ["诊断依据", "肺部体征"],
},
{
"question_id": "q3",
"question_type": "single_choice",
"stem": "下列哪组检查最有助于完善本例肺炎诊断和严重程度评估?",
"options": [
{"key": "A", "text": "血常规、CRP、胸片、血氧饱和度"},
{"key": "B", "text": "肝功能、甲状腺功能、腹部超声"},
{"key": "C", "text": "胃镜、幽门螺杆菌、粪便常规"},
{"key": "D", "text": "骨龄片、维生素D、微量元素"},
],
"answer": "A",
"analysis": "炎症指标、胸部影像和血氧情况可共同支撑诊断和严重程度判断。",
"video": video,
"knowledge_points": ["检查选择", "辅助检查"],
},
{
"question_id": "q4",
"question_type": "single_choice",
"stem": "治疗方案中最需要覆盖的核心原则是?",
"options": [
{"key": "A", "text": "抗感染、止咳平喘、改善氧合、严密观察"},
{"key": "B", "text": "立即长期激素维持治疗"},
{"key": "C", "text": "只需补充维生素"},
{"key": "D", "text": "无需随访观察"},
],
"answer": "A",
"analysis": "儿童肺炎处置需围绕抗感染、呼吸症状缓解、氧合监测和病情变化预案展开。",
"video": video,
"knowledge_points": ["治疗原则", "风险预案"],
},
{
"question_id": "q5",
"question_type": "single_choice",
"stem": "向家属沟通时,最合适的内容是?",
"options": [
{"key": "A", "text": "说明病情、观察指标、用药注意事项和复诊/住院指征"},
{"key": "B", "text": "只告知已经开药即可"},
{"key": "C", "text": "不需要解释检查结果"},
{"key": "D", "text": "避免回答家属担心的问题"},
],
"answer": "A",
"analysis": "儿科场景需要重视家属知情、风险信号识别和家庭护理教育。",
"video": video,
"knowledge_points": ["人文沟通", "健康教育"],
},
]