import json import logging import re import time from typing import Any from app.agents.llm_adapter import DeepSeekClient from app.core.config import settings from app.core.exceptions import AppError from app.models.source_case import CaseBase from app.models.training import SessionOrder, SessionSubmission, TrainingSession logger = logging.getLogger(__name__) class ScoringAgent: """评分 Agent:结合病例、问诊过程、检查结果、提交内容和 scoring_rule 生成结构化评价。""" def __init__(self, llm: DeepSeekClient | None = None) -> None: self.llm = llm or DeepSeekClient() async def score( self, session: TrainingSession, case: CaseBase, memory_messages: list[dict], orders: list[SessionOrder], submission: SessionSubmission, rubric: object | None, guideline_refs: list[dict], scoring_rules: list | None = None, ) -> dict: """评价生成:优先调用快速模型输出 JSON,失败时返回稳定兜底评分。""" start = time.perf_counter() messages = self._build_messages(session, case, memory_messages, orders, submission, guideline_refs, scoring_rules or []) try: response = await self.llm.chat( messages, settings.llm_fast_model, thinking_enabled=settings.llm_fast_thinking_enabled, reasoning_effort=None, response_format={"type": "json_object"} if settings.llm_scoring_json_response else None, max_tokens=min(settings.llm_scoring_max_tokens, 1800), ) data = json.loads(response.content) data = self._normalize_score_payload(data, session.score_type, guideline_refs) data["_llm_model"] = response.model data["_latency_metrics"] = {"scoring_latency_ms": response.latency_ms, "fallback_used": False} return data except (AppError, json.JSONDecodeError, KeyError, TypeError, ValueError) as exc: logger.warning("scoring_agent.fallback session_id=%s error=%s", session.id, exc.__class__.__name__) data = self._fallback_score(session.score_type, guideline_refs) data["_llm_model"] = f"local-fallback-{settings.llm_fast_model}" data["_latency_metrics"] = { "scoring_latency_ms": int((time.perf_counter() - start) * 1000), "fallback_used": True, "fallback_reason": exc.__class__.__name__, } return data async def score_teaching( self, *, case: CaseBase, teaching_payload: dict, scoring_rules: list, guideline_refs: list[dict], score_type: str, ) -> dict: """教学互动评价:根据题目、标准答案、学生作答和评分规则生成结构化评价。""" start = time.perf_counter() messages = self._build_teaching_messages(case, teaching_payload, scoring_rules, guideline_refs, score_type) try: response = await self.llm.chat( messages, settings.llm_fast_model, thinking_enabled=settings.llm_fast_thinking_enabled, reasoning_effort=None, response_format={"type": "json_object"} if settings.llm_scoring_json_response else None, max_tokens=min(settings.llm_scoring_max_tokens, 1600), ) data = json.loads(response.content) data = self._normalize_score_payload(data, score_type, guideline_refs) data["_llm_model"] = response.model data["_latency_metrics"] = {"scoring_latency_ms": response.latency_ms, "fallback_used": False} return data except (AppError, json.JSONDecodeError, KeyError, TypeError, ValueError) as exc: logger.warning("teaching_scoring_agent.fallback case_id=%s error=%s", case.id, exc.__class__.__name__) data = self._fallback_teaching_score(score_type, guideline_refs, teaching_payload) data["_llm_model"] = f"local-fallback-{settings.llm_fast_model}" data["_latency_metrics"] = { "scoring_latency_ms": int((time.perf_counter() - start) * 1000), "fallback_used": True, "fallback_reason": exc.__class__.__name__, } return data def _build_teaching_messages( self, case: CaseBase, teaching_payload: dict, scoring_rules: list, guideline_refs: list[dict], score_type: str, ) -> list[dict]: """教学评分提示词:只传教学互动评价需要的病例、题目、答案和评分规则。""" payload = { "score_type": score_type, "case": { "case_id": case.id, "title": case.title, "chief_complaint": case.chief_complaint, "description": self._truncate(case.description, 320), "knowledge_points": case.knowledge_points or [], "key_points": case.key_points or [], }, "teaching": teaching_payload, "scoring_rules": self._compact_scoring_rules(scoring_rules), "guidelines": self._compact_guidelines(guideline_refs), } system = ( "你是医学教学互动评价专家,只输出合法 JSON。" "请根据病例、教学目标、选择题、标准答案、解析文本、学生作答和评分规则生成教学评价。" "输出字段固定为 score_type,total_score,dimension_scores,errors,improvement_plan," "evidence_summary,guideline_refs,overall_comment,score_details。" "dimension_scores 包含 知识掌握、临床推理、检查理解、治疗决策、人文沟通 维度," "每项包含 dimension,score,max_score,comment,evidence,deductions,improvement。" "score_details 对应 scoring_rules 或题目维度,每项包含 rule_id,dimension,score," "deducted_reason,evidence_message_ids,ai_confidence,comment。" "必须指出答对题目、答错题目、错误原因、下一步学习重点。" "本评价仅用于医学教学训练,不替代真实临床诊疗。" ) return [{"role": "system", "content": system}, {"role": "user", "content": json.dumps(payload, ensure_ascii=False)}] def _fallback_teaching_score(self, score_type: str, guideline_refs: list[dict], teaching_payload: dict) -> dict: """教学评分兜底:LLM 不可用时按选择题正确率生成稳定评价结构。""" results = teaching_payload.get("answer_results") or [] total = len(results) correct = sum(1 for item in results if item.get("is_correct")) accuracy = correct / total if total else 0 total_score = round(accuracy * 100, 1) if total else 0 incorrect = [item for item in results if not item.get("is_correct")] incorrect_titles = [f"{item.get('question_id')}: {item.get('stem', '')}" for item in incorrect[:5]] data = { "score_type": "percentage", "total_score": total_score, "dimension_scores": [ { "dimension": "知识掌握", "score": round(total_score * 0.35, 1), "max_score": 35, "comment": f"共 {total} 题,答对 {correct} 题。", "evidence": [f"正确率 {round(accuracy * 100, 1)}%"], "deductions": incorrect_titles, "improvement": "复习错题对应知识点和病例解析。", }, { "dimension": "临床推理", "score": round(total_score * 0.25, 1), "max_score": 25, "comment": "根据选择题表现评估临床判断链路。", "evidence": [item.get("stem", "") for item in results[:3]], "deductions": incorrect_titles, "improvement": "把题目选项与病例主诉、体征和检查结果逐项对应。", }, { "dimension": "检查理解", "score": round(total_score * 0.15, 1), "max_score": 15, "comment": "重点关注检查项目与病情严重程度判断。", "evidence": teaching_payload.get("scoring_focus", "").split("、")[:3], "deductions": [], "improvement": "理解血氧、胸片和炎症指标在肺炎评估中的作用。", }, { "dimension": "治疗决策", "score": round(total_score * 0.15, 1), "max_score": 15, "comment": "根据题目表现评估治疗原则掌握情况。", "evidence": teaching_payload.get("teaching_goal", "").split("、")[:3], "deductions": [], "improvement": "复习抗感染、平喘、氧合监测和风险预案。", }, { "dimension": "人文沟通", "score": round(total_score * 0.10, 1), "max_score": 10, "comment": "教学互动中需继续强化家属沟通和健康教育。", "evidence": ["教学互动题包含沟通与健康教育相关内容。"], "deductions": [], "improvement": "向家属说明病情、观察指标、复诊指征和用药注意事项。", }, ], "score_details": [], "errors": [ { "title": "教学题目答题错误", "description": ";".join(incorrect_titles) if incorrect_titles else "暂无明显错题。", "severity": "medium" if incorrect_titles else "low", "related_dimension": "知识掌握", } ], "improvement_plan": [ "复盘错题解析,明确每个选项与病例证据的对应关系。", "把病例中的主诉、体征、检查和治疗原则整理成一条临床推理链。", "针对血氧、胸片、炎症指标和医患沟通进行专项复习。", ], "evidence_summary": [ f"教学互动共提交 {total} 题,答对 {correct} 题。", "评分依据包括题目标准答案、解析文本、教学目标和评分规则。", ], "guideline_refs": guideline_refs, "overall_comment": f"本次教学互动正确率为 {round(accuracy * 100, 1)}%,请结合错题解析继续巩固病例关键知识点。", } return self._convert_to_five_point(data) if score_type == "five_point" else data def _build_messages( self, session: TrainingSession, case: CaseBase, memory_messages: list[dict], orders: list[SessionOrder], submission: SessionSubmission, guideline_refs: list[dict], scoring_rules: list, ) -> list[dict]: """评分提示词:只传评价必需字段,避免旧表冗余字段和长病历全文拖慢生成。""" transcript_summary = [ {"role": item.get("role"), "content": self._truncate(item.get("content", ""), 220)} for item in memory_messages[-14:] if item.get("content") ] ordered_payload = [ { "item_code": order.item_code, "item_name": order.item_name, "result_text": self._truncate(order.result_text, 180), "is_key": order.is_key, "is_abnormal": order.is_abnormal, } for order in orders ] payload = { "score_type": session.score_type, "case": { "title": case.title, "chief_complaint": case.chief_complaint, "standard_diagnosis": case.diagnosis_primary, "diagnosis_basis": self._truncate(case.diagnosis_basis, 260), "key_symptoms": case.key_symptoms or [], "key_exams": case.key_exams or [], "key_points": case.key_points or [], }, "conversation": transcript_summary, "orders": ordered_payload, "submission": { "primary_diagnosis": submission.primary_diagnosis, "differential_diagnoses": submission.differential_diagnoses or [], "diagnosis_basis": self._truncate(submission.diagnosis_basis, 320), "treatment_principle": self._truncate(submission.treatment_principle, 220), "treatment_measures": self._truncate(submission.treatment_measures, 320), "risk_plan": self._truncate(submission.risk_plan, 220), "communication": self._truncate(submission.communication, 220), "follow_up": self._truncate(submission.follow_up, 180), }, "scoring_rules": self._compact_scoring_rules(scoring_rules), "guidelines": self._compact_guidelines(guideline_refs), } system = ( "你是医学教学问诊评分专家,只输出合法 JSON。" "请结合病例、问诊过程、检查申请、诊断和治疗提交进行教学评价。" "输出字段固定为 score_type,total_score,dimension_scores,errors,improvement_plan," "evidence_summary,guideline_refs,overall_comment,score_details。" "dimension_scores 为 5-6 项,每项包含 dimension,score,max_score,comment,evidence,deductions,improvement。" "score_details 对应 scoring_rules,每项包含 rule_id,dimension,score,deducted_reason,evidence_message_ids,ai_confidence,comment。" "evidence、deductions、improvement_plan、evidence_summary 必须是数组,每个元素一句话。" "errors 每项包含 title,description,severity,related_dimension。" "评价必须具体指出用户问了什么、申请了什么检查、诊断治疗哪里充分或不足。" "报告仅用于教学训练,不替代真实临床诊疗。" ) return [{"role": "system", "content": system}, {"role": "user", "content": json.dumps(payload, ensure_ascii=False)}] def _compact_scoring_rules(self, scoring_rules: list) -> list[dict]: """源库评分规则压缩:把 scoring_rule 转为评分 Agent 可直接使用的细则。""" compact = [] for item in scoring_rules[:12]: compact.append( { "rule_id": getattr(item, "id", None), "dimension": getattr(item, "dimension", ""), "competency_dimension": getattr(item, "competency_dimension", ""), "score_weight": float(getattr(item, "score_weight", 0) or 0), "ai_auto_score": bool(getattr(item, "ai_auto_score", True)), "osce_dimension": bool(getattr(item, "osce_dimension", False)), "scoring_standard": self._truncate(getattr(item, "scoring_standard", ""), 240), "rubric_json": getattr(item, "rubric_json", {}) or {}, } ) return compact def _compact_guidelines(self, guideline_refs: list[dict]) -> list[dict]: """参考指南压缩:保留少量命中来源,供模型引用。""" compact = [] for item in (guideline_refs or [])[:3]: if isinstance(item, dict): compact.append( { "title": item.get("title") or item.get("source"), "content": self._truncate(item.get("content") or item.get("chunk"), 180), } ) return compact def _normalize_score_payload(self, data: dict, score_type: str, guideline_refs: list[dict]) -> dict: """评分结构校验:补齐缺失字段,并拆分模型返回的长编号列表。""" if not isinstance(data, dict): data = self._fallback_score(score_type, guideline_refs) data.setdefault("score_type", "percentage") data.setdefault("total_score", 0) data.setdefault("dimension_scores", []) data.setdefault("score_details", []) data.setdefault("errors", []) data.setdefault("improvement_plan", []) data.setdefault("evidence_summary", []) data.setdefault("guideline_refs", guideline_refs) data.setdefault("overall_comment", "") normalized_dimensions = [] for item in data.get("dimension_scores") or []: if not isinstance(item, dict): continue normalized_dimensions.append( { "dimension": str(item.get("dimension") or "未命名维度"), "score": float(item.get("score") or 0), "max_score": float(item.get("max_score") or (100 if data.get("score_type") == "percentage" else 5)), "comment": self._truncate(item.get("comment") or "", 180), "evidence": self._ensure_list(item.get("evidence")), "deductions": self._ensure_list(item.get("deductions")), "improvement": self._truncate(item.get("improvement") or "", 180), } ) data["dimension_scores"] = normalized_dimensions or self._fallback_score("percentage", guideline_refs)["dimension_scores"] data["score_details"] = self._normalize_score_details(data.get("score_details"), data["dimension_scores"]) data["errors"] = self._normalize_errors(data.get("errors")) data["improvement_plan"] = self._ensure_list(data.get("improvement_plan")) data["evidence_summary"] = self._ensure_list(data.get("evidence_summary")) data["guideline_refs"] = self._normalize_guideline_refs(data.get("guideline_refs"), guideline_refs) data["overall_comment"] = self._truncate(data.get("overall_comment") or "", 260) if score_type == "five_point" and data.get("score_type") != "five_point": return self._convert_to_five_point(data) if score_type == "percentage" and data.get("score_type") != "percentage": data["score_type"] = "percentage" return data def _normalize_score_details(self, raw_details: object, dimension_scores: list[dict]) -> list[dict]: """评分明细归一化:生成可落库的 training_score_detail 数据。""" source = raw_details if isinstance(raw_details, list) and raw_details else dimension_scores details = [] for item in source: if not isinstance(item, dict): continue deducted_reason = item.get("deducted_reason") if not deducted_reason: deducted_reason = ";".join(str(value) for value in item.get("deductions", []) if value) details.append( { "rule_id": item.get("rule_id"), "dimension": str(item.get("dimension") or "综合表现"), "score": float(item.get("score") or 0), "deducted_reason": self._truncate(deducted_reason or "", 260), "evidence_message_ids": self._ensure_list(item.get("evidence_message_ids") or item.get("evidence")), "ai_confidence": float(item.get("ai_confidence") or 0.85), "comment": self._truncate(item.get("comment") or item.get("improvement") or "", 220), } ) return details def _normalize_errors(self, errors: object) -> list[dict]: """错误项归一化:转为报告可渲染的扣分项。""" normalized = [] for index, item in enumerate(self._ensure_list(errors), start=1): if isinstance(item, dict): normalized.append( { "title": self._truncate(item.get("title") or f"问题 {index}", 60), "description": self._truncate(item.get("description") or item.get("comment") or "", 180), "severity": item.get("severity") or "medium", "related_dimension": item.get("related_dimension") or item.get("dimension") or "综合表现", } ) else: normalized.append( { "title": f"问题 {index}", "description": self._truncate(str(item), 180), "severity": "medium", "related_dimension": "综合表现", } ) return normalized[:6] def _normalize_guideline_refs(self, value: object, fallback_refs: list[dict]) -> list[dict]: """指南引用归一化:保证字符串、字典或空值都能转成字典数组。""" raw_items = value if isinstance(value, list) else ([value] if value else fallback_refs) normalized = [] for index, item in enumerate(raw_items or [], start=1): if isinstance(item, dict): normalized.append( { "title": self._truncate(item.get("title") or item.get("source") or f"参考依据 {index}", 80), "content": self._truncate( item.get("content") or item.get("text") or item.get("chunk") or item.get("summary") or "", 180, ), "source": self._truncate(item.get("source") or item.get("source_type") or "knowledge_base", 80), } ) elif item: normalized.append({"title": f"参考依据 {index}", "content": self._truncate(str(item), 180), "source": "llm_output"}) return normalized[:6] def _ensure_list(self, value: object) -> list: """列表规整:拆分模型常见的 1/2/3 编号长文本。""" if value is None: return [] raw_items = value if isinstance(value, list) else [value] items: list = [] for raw in raw_items: if raw is None: continue if isinstance(raw, dict): items.append(raw) continue text = str(raw).strip() if not text: continue parts = re.split(r"(?:^|\s)(?:\d+[\.\、)]|[;;]\d+[;;])\s*", text) parts = [part.strip(";;、\n\t ") for part in parts if part and part.strip(";;、\n\t ")] items.extend(parts if len(parts) > 1 else [text]) return [self._truncate(item, 180) if not isinstance(item, dict) else item for item in items[:8]] def _fallback_score(self, score_type: str, guideline_refs: list[dict]) -> dict: """评分兜底:LLM 失败时生成稳定结构化评分,保证报告流程可展示。""" data = { "score_type": "percentage", "total_score": 80, "dimension_scores": [ { "dimension": "信息获取", "score": 20, "max_score": 25, "comment": "完成主要症状追问,但儿科专科病史仍需补充。", "evidence": ["围绕发热、咳嗽、喘息等核心症状展开问诊。"], "deductions": ["既往喘息史、过敏史、疫苗接种史、家属照护能力等信息不够完整。"], "improvement": "下一次按主诉、现病史、既往史、个人史、家族史和家属顾虑逐项补全。", }, { "dimension": "分析推理", "score": 16, "max_score": 20, "comment": "诊断方向基本正确,但严重程度分层需要更清晰。", "evidence": ["主要诊断指向支气管肺炎。"], "deductions": ["鉴别诊断和严重程度判断未充分引用血氧、胸片和炎症指标。"], "improvement": "把症状、体征、影像、血氧和炎症指标逐条对应到诊断依据和病情分层。", }, { "dimension": "检查利用", "score": 12, "max_score": 15, "comment": "关键检查申请较完整,但检查结果解释仍可细化。", "evidence": ["胸片、血氧或炎症指标可支持肺炎诊断和严重程度判断。"], "deductions": ["对 SpO2、胸片异常和炎症指标的临床意义解释不够具体。"], "improvement": "在诊断依据中写明每项异常检查如何支持诊断、鉴别诊断和治疗决策。", }, { "dimension": "处置决策", "score": 16, "max_score": 20, "comment": "治疗原则基本完整,仍需补充监测和升级处理条件。", "evidence": ["治疗原则覆盖抗感染、止咳平喘、改善氧合和观察病情。"], "deductions": ["药物选择、门诊/住院判断、风险预案和随访节点不够具体。"], "improvement": "补充治疗适应证、监测指标、病情加重预案和复诊/住院指征。", }, { "dimension": "沟通人文", "score": 8, "max_score": 10, "comment": "有基本沟通意识,但家属顾虑回应不够结构化。", "evidence": ["治疗方案中包含向家属说明病情和复诊指征。"], "deductions": ["知情同意、家属担心、用药注意事项和家庭护理教育仍需补充。"], "improvement": "增加对家属顾虑的回应、危险信号说明和家庭护理指导。", }, { "dimension": "临床整合", "score": 8, "max_score": 10, "comment": "完成训练闭环,证据衔接仍需强化。", "evidence": ["完成问诊、检查、诊断、治疗和评价流程。"], "deductions": ["各阶段证据之间的逻辑衔接和 SOAP 化表达不够清晰。"], "improvement": "用 SOAP 结构归纳病情,把证据、判断和计划串联起来。", }, ], "score_details": [ { "rule_id": None, "dimension": "信息获取", "score": 20, "deducted_reason": "既往喘息史、过敏史、疫苗接种史、家属照护能力等信息不够完整。", "evidence_message_ids": ["围绕发热、咳嗽、喘息等核心症状展开问诊。"], "ai_confidence": 0.85, "comment": "完成主要症状追问,但儿科专科病史仍需补充。", }, { "rule_id": None, "dimension": "分析推理", "score": 16, "deducted_reason": "鉴别诊断和严重程度判断未充分引用血氧、胸片和炎症指标。", "evidence_message_ids": ["主要诊断指向支气管肺炎。"], "ai_confidence": 0.84, "comment": "诊断方向基本正确,但严重程度分层需要更清晰。", }, { "rule_id": None, "dimension": "检查利用", "score": 12, "deducted_reason": "对 SpO2、胸片异常和炎症指标的临床意义解释不够具体。", "evidence_message_ids": ["胸片、血氧或炎症指标可支持肺炎诊断和严重程度判断。"], "ai_confidence": 0.84, "comment": "关键检查申请较完整,但检查结果解释仍可细化。", }, ], "errors": [ { "title": "信息采集不够系统", "description": "儿科特异性病史追问不足,影响对喘息诱因、感染风险和肺炎严重程度的判断。", "severity": "medium", "related_dimension": "信息获取", } ], "improvement_plan": [ "补充疫苗接种史、过敏史、既往喘息史、近期接触史和家庭照护能力评估。", "诊断依据中逐条引用发热、咳嗽、喘息、肺部体征、胸片、血氧和炎症指标。", "治疗方案中补充监测指标、病情加重预案、复诊/住院指征和家属健康教育。", "沟通环节增加家属担心回应、知情说明和家庭护理要点。", ], "evidence_summary": [ "问诊:已完成核心症状追问,但儿科特异性病史仍需补全。", "检查:需重点评价是否申请并利用胸片、血氧和炎症指标。", "诊断:主要诊断方向基本正确,鉴别诊断和严重程度分层需要更完整。", "治疗:治疗原则基本覆盖抗感染、平喘、氧合和观察。", "沟通:已有基本告知意识,仍需加强家属顾虑回应和健康教育。", ], "guideline_refs": guideline_refs, "overall_comment": "本次训练完成核心闭环,诊断和处理方向基本正确;后续需要强化问诊结构化、检查结果利用和治疗细化。", } return self._convert_to_five_point(data) if score_type == "five_point" else data def _convert_to_five_point(self, data: dict) -> dict: """分数转换:将百分制评价转换为五分制,同时保留证据、扣分和改进细则。""" converted = dict(data) converted["score_type"] = "five_point" converted["total_score"] = round(float(data.get("total_score", 0)) / 20, 1) converted["dimension_scores"] = [ { **item, "score": round(float(item.get("score", 0)) / float(item.get("max_score") or 100) * 5, 1), "max_score": 5, } for item in data.get("dimension_scores", []) ] converted["score_details"] = [ { **item, "score": round(float(item.get("score", 0)) / 20, 1), } for item in data.get("score_details", []) ] return converted def _truncate(self, value: Any, limit: int) -> str: """文本截断:限制模型输入和报告字段长度,降低评分耗时并稳定排版。""" if value is None: return "" text = str(value).strip() return text if len(text) <= limit else f"{text[:limit]}..."