Files
2026-06-08 16:49:45 +08:00

589 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
import re
import time
from typing import Any
from app.agents.llm_adapter import DeepSeekClient
from app.core.config import settings
from app.core.exceptions import AppError
from app.models.source_case import CaseBase
from app.models.training import SessionOrder, SessionSubmission, TrainingSession
logger = logging.getLogger(__name__)
class ScoringAgent:
"""评分 Agent:结合病例、问诊过程、检查结果、提交内容和 scoring_rule 生成结构化评价。"""
def __init__(self, llm: DeepSeekClient | None = None) -> None:
self.llm = llm or DeepSeekClient()
async def score(
self,
session: TrainingSession,
case: CaseBase,
memory_messages: list[dict],
orders: list[SessionOrder],
submission: SessionSubmission,
rubric: object | None,
guideline_refs: list[dict],
scoring_rules: list | None = None,
) -> dict:
"""评价生成:优先调用快速模型输出 JSON,失败时返回稳定兜底评分。"""
start = time.perf_counter()
messages = self._build_messages(session, case, memory_messages, orders, submission, guideline_refs, scoring_rules or [])
try:
response = await self.llm.chat(
messages,
settings.llm_fast_model,
thinking_enabled=settings.llm_fast_thinking_enabled,
reasoning_effort=None,
response_format={"type": "json_object"} if settings.llm_scoring_json_response else None,
max_tokens=min(settings.llm_scoring_max_tokens, 1800),
)
data = json.loads(response.content)
data = self._normalize_score_payload(data, session.score_type, guideline_refs)
data["_llm_model"] = response.model
data["_latency_metrics"] = {"scoring_latency_ms": response.latency_ms, "fallback_used": False}
return data
except (AppError, json.JSONDecodeError, KeyError, TypeError, ValueError) as exc:
logger.warning("scoring_agent.fallback session_id=%s error=%s", session.id, exc.__class__.__name__)
data = self._fallback_score(session.score_type, guideline_refs)
data["_llm_model"] = f"local-fallback-{settings.llm_fast_model}"
data["_latency_metrics"] = {
"scoring_latency_ms": int((time.perf_counter() - start) * 1000),
"fallback_used": True,
"fallback_reason": exc.__class__.__name__,
}
return data
async def score_teaching(
self,
*,
case: CaseBase,
teaching_payload: dict,
scoring_rules: list,
guideline_refs: list[dict],
score_type: str,
) -> dict:
"""教学互动评价:根据题目、标准答案、学生作答和评分规则生成结构化评价。"""
start = time.perf_counter()
messages = self._build_teaching_messages(case, teaching_payload, scoring_rules, guideline_refs, score_type)
try:
response = await self.llm.chat(
messages,
settings.llm_fast_model,
thinking_enabled=settings.llm_fast_thinking_enabled,
reasoning_effort=None,
response_format={"type": "json_object"} if settings.llm_scoring_json_response else None,
max_tokens=min(settings.llm_scoring_max_tokens, 1600),
)
data = json.loads(response.content)
data = self._normalize_score_payload(data, score_type, guideline_refs)
data["_llm_model"] = response.model
data["_latency_metrics"] = {"scoring_latency_ms": response.latency_ms, "fallback_used": False}
return data
except (AppError, json.JSONDecodeError, KeyError, TypeError, ValueError) as exc:
logger.warning("teaching_scoring_agent.fallback case_id=%s error=%s", case.id, exc.__class__.__name__)
data = self._fallback_teaching_score(score_type, guideline_refs, teaching_payload)
data["_llm_model"] = f"local-fallback-{settings.llm_fast_model}"
data["_latency_metrics"] = {
"scoring_latency_ms": int((time.perf_counter() - start) * 1000),
"fallback_used": True,
"fallback_reason": exc.__class__.__name__,
}
return data
def _build_teaching_messages(
self,
case: CaseBase,
teaching_payload: dict,
scoring_rules: list,
guideline_refs: list[dict],
score_type: str,
) -> list[dict]:
"""教学评分提示词:只传教学互动评价需要的病例、题目、答案和评分规则。"""
payload = {
"score_type": score_type,
"case": {
"case_id": case.id,
"title": case.title,
"chief_complaint": case.chief_complaint,
"description": self._truncate(case.description, 320),
"knowledge_points": case.knowledge_points or [],
"key_points": case.key_points or [],
},
"teaching": teaching_payload,
"scoring_rules": self._compact_scoring_rules(scoring_rules),
"guidelines": self._compact_guidelines(guideline_refs),
}
system = (
"你是医学教学互动评价专家,只输出合法 JSON。"
"请根据病例、教学目标、选择题、标准答案、解析文本、学生作答和评分规则生成教学评价。"
"输出字段固定为 score_type,total_score,dimension_scores,errors,improvement_plan,"
"evidence_summary,guideline_refs,overall_comment,score_details。"
"dimension_scores 包含 知识掌握、临床推理、检查理解、治疗决策、人文沟通 维度,"
"每项包含 dimension,score,max_score,comment,evidence,deductions,improvement。"
"score_details 对应 scoring_rules 或题目维度,每项包含 rule_id,dimension,score,"
"deducted_reason,evidence_message_ids,ai_confidence,comment。"
"必须指出答对题目、答错题目、错误原因、下一步学习重点。"
"本评价仅用于医学教学训练,不替代真实临床诊疗。"
)
return [{"role": "system", "content": system}, {"role": "user", "content": json.dumps(payload, ensure_ascii=False)}]
def _fallback_teaching_score(self, score_type: str, guideline_refs: list[dict], teaching_payload: dict) -> dict:
"""教学评分兜底:LLM 不可用时按选择题正确率生成稳定评价结构。"""
results = teaching_payload.get("answer_results") or []
total = len(results)
correct = sum(1 for item in results if item.get("is_correct"))
accuracy = correct / total if total else 0
total_score = round(accuracy * 100, 1) if total else 0
incorrect = [item for item in results if not item.get("is_correct")]
incorrect_titles = [f"{item.get('question_id')}: {item.get('stem', '')}" for item in incorrect[:5]]
data = {
"score_type": "percentage",
"total_score": total_score,
"dimension_scores": [
{
"dimension": "知识掌握",
"score": round(total_score * 0.35, 1),
"max_score": 35,
"comment": f"{total} 题,答对 {correct} 题。",
"evidence": [f"正确率 {round(accuracy * 100, 1)}%"],
"deductions": incorrect_titles,
"improvement": "复习错题对应知识点和病例解析。",
},
{
"dimension": "临床推理",
"score": round(total_score * 0.25, 1),
"max_score": 25,
"comment": "根据选择题表现评估临床判断链路。",
"evidence": [item.get("stem", "") for item in results[:3]],
"deductions": incorrect_titles,
"improvement": "把题目选项与病例主诉、体征和检查结果逐项对应。",
},
{
"dimension": "检查理解",
"score": round(total_score * 0.15, 1),
"max_score": 15,
"comment": "重点关注检查项目与病情严重程度判断。",
"evidence": teaching_payload.get("scoring_focus", "").split("")[:3],
"deductions": [],
"improvement": "理解血氧、胸片和炎症指标在肺炎评估中的作用。",
},
{
"dimension": "治疗决策",
"score": round(total_score * 0.15, 1),
"max_score": 15,
"comment": "根据题目表现评估治疗原则掌握情况。",
"evidence": teaching_payload.get("teaching_goal", "").split("")[:3],
"deductions": [],
"improvement": "复习抗感染、平喘、氧合监测和风险预案。",
},
{
"dimension": "人文沟通",
"score": round(total_score * 0.10, 1),
"max_score": 10,
"comment": "教学互动中需继续强化家属沟通和健康教育。",
"evidence": ["教学互动题包含沟通与健康教育相关内容。"],
"deductions": [],
"improvement": "向家属说明病情、观察指标、复诊指征和用药注意事项。",
},
],
"score_details": [],
"errors": [
{
"title": "教学题目答题错误",
"description": "".join(incorrect_titles) if incorrect_titles else "暂无明显错题。",
"severity": "medium" if incorrect_titles else "low",
"related_dimension": "知识掌握",
}
],
"improvement_plan": [
"复盘错题解析,明确每个选项与病例证据的对应关系。",
"把病例中的主诉、体征、检查和治疗原则整理成一条临床推理链。",
"针对血氧、胸片、炎症指标和医患沟通进行专项复习。",
],
"evidence_summary": [
f"教学互动共提交 {total} 题,答对 {correct} 题。",
"评分依据包括题目标准答案、解析文本、教学目标和评分规则。",
],
"guideline_refs": guideline_refs,
"overall_comment": f"本次教学互动正确率为 {round(accuracy * 100, 1)}%,请结合错题解析继续巩固病例关键知识点。",
}
return self._convert_to_five_point(data) if score_type == "five_point" else data
def _build_messages(
self,
session: TrainingSession,
case: CaseBase,
memory_messages: list[dict],
orders: list[SessionOrder],
submission: SessionSubmission,
guideline_refs: list[dict],
scoring_rules: list,
) -> list[dict]:
"""评分提示词:只传评价必需字段,避免旧表冗余字段和长病历全文拖慢生成。"""
transcript_summary = [
{"role": item.get("role"), "content": self._truncate(item.get("content", ""), 220)}
for item in memory_messages[-14:]
if item.get("content")
]
ordered_payload = [
{
"item_code": order.item_code,
"item_name": order.item_name,
"result_text": self._truncate(order.result_text, 180),
"is_key": order.is_key,
"is_abnormal": order.is_abnormal,
}
for order in orders
]
payload = {
"score_type": session.score_type,
"case": {
"title": case.title,
"chief_complaint": case.chief_complaint,
"standard_diagnosis": case.diagnosis_primary,
"diagnosis_basis": self._truncate(case.diagnosis_basis, 260),
"key_symptoms": case.key_symptoms or [],
"key_exams": case.key_exams or [],
"key_points": case.key_points or [],
},
"conversation": transcript_summary,
"orders": ordered_payload,
"submission": {
"primary_diagnosis": submission.primary_diagnosis,
"differential_diagnoses": submission.differential_diagnoses or [],
"diagnosis_basis": self._truncate(submission.diagnosis_basis, 320),
"treatment_principle": self._truncate(submission.treatment_principle, 220),
"treatment_measures": self._truncate(submission.treatment_measures, 320),
"risk_plan": self._truncate(submission.risk_plan, 220),
"communication": self._truncate(submission.communication, 220),
"follow_up": self._truncate(submission.follow_up, 180),
},
"scoring_rules": self._compact_scoring_rules(scoring_rules),
"guidelines": self._compact_guidelines(guideline_refs),
}
system = (
"你是医学教学问诊评分专家,只输出合法 JSON。"
"请结合病例、问诊过程、检查申请、诊断和治疗提交进行教学评价。"
"输出字段固定为 score_type,total_score,dimension_scores,errors,improvement_plan,"
"evidence_summary,guideline_refs,overall_comment,score_details。"
"dimension_scores 为 5-6 项,每项包含 dimension,score,max_score,comment,evidence,deductions,improvement。"
"score_details 对应 scoring_rules,每项包含 rule_id,dimension,score,deducted_reason,evidence_message_ids,ai_confidence,comment。"
"evidence、deductions、improvement_plan、evidence_summary 必须是数组,每个元素一句话。"
"errors 每项包含 title,description,severity,related_dimension。"
"评价必须具体指出用户问了什么、申请了什么检查、诊断治疗哪里充分或不足。"
"报告仅用于教学训练,不替代真实临床诊疗。"
)
return [{"role": "system", "content": system}, {"role": "user", "content": json.dumps(payload, ensure_ascii=False)}]
def _compact_scoring_rules(self, scoring_rules: list) -> list[dict]:
"""源库评分规则压缩:把 scoring_rule 转为评分 Agent 可直接使用的细则。"""
compact = []
for item in scoring_rules[:12]:
compact.append(
{
"rule_id": getattr(item, "id", None),
"dimension": getattr(item, "dimension", ""),
"competency_dimension": getattr(item, "competency_dimension", ""),
"score_weight": float(getattr(item, "score_weight", 0) or 0),
"ai_auto_score": bool(getattr(item, "ai_auto_score", True)),
"osce_dimension": bool(getattr(item, "osce_dimension", False)),
"scoring_standard": self._truncate(getattr(item, "scoring_standard", ""), 240),
"rubric_json": getattr(item, "rubric_json", {}) or {},
}
)
return compact
def _compact_guidelines(self, guideline_refs: list[dict]) -> list[dict]:
"""参考指南压缩:保留少量命中来源,供模型引用。"""
compact = []
for item in (guideline_refs or [])[:3]:
if isinstance(item, dict):
compact.append(
{
"title": item.get("title") or item.get("source"),
"content": self._truncate(item.get("content") or item.get("chunk"), 180),
}
)
return compact
def _normalize_score_payload(self, data: dict, score_type: str, guideline_refs: list[dict]) -> dict:
"""评分结构校验:补齐缺失字段,并拆分模型返回的长编号列表。"""
if not isinstance(data, dict):
data = self._fallback_score(score_type, guideline_refs)
data.setdefault("score_type", "percentage")
data.setdefault("total_score", 0)
data.setdefault("dimension_scores", [])
data.setdefault("score_details", [])
data.setdefault("errors", [])
data.setdefault("improvement_plan", [])
data.setdefault("evidence_summary", [])
data.setdefault("guideline_refs", guideline_refs)
data.setdefault("overall_comment", "")
normalized_dimensions = []
for item in data.get("dimension_scores") or []:
if not isinstance(item, dict):
continue
normalized_dimensions.append(
{
"dimension": str(item.get("dimension") or "未命名维度"),
"score": float(item.get("score") or 0),
"max_score": float(item.get("max_score") or (100 if data.get("score_type") == "percentage" else 5)),
"comment": self._truncate(item.get("comment") or "", 180),
"evidence": self._ensure_list(item.get("evidence")),
"deductions": self._ensure_list(item.get("deductions")),
"improvement": self._truncate(item.get("improvement") or "", 180),
}
)
data["dimension_scores"] = normalized_dimensions or self._fallback_score("percentage", guideline_refs)["dimension_scores"]
data["score_details"] = self._normalize_score_details(data.get("score_details"), data["dimension_scores"])
data["errors"] = self._normalize_errors(data.get("errors"))
data["improvement_plan"] = self._ensure_list(data.get("improvement_plan"))
data["evidence_summary"] = self._ensure_list(data.get("evidence_summary"))
data["guideline_refs"] = self._normalize_guideline_refs(data.get("guideline_refs"), guideline_refs)
data["overall_comment"] = self._truncate(data.get("overall_comment") or "", 260)
if score_type == "five_point" and data.get("score_type") != "five_point":
return self._convert_to_five_point(data)
if score_type == "percentage" and data.get("score_type") != "percentage":
data["score_type"] = "percentage"
return data
def _normalize_score_details(self, raw_details: object, dimension_scores: list[dict]) -> list[dict]:
"""评分明细归一化:生成可落库的 training_score_detail 数据。"""
source = raw_details if isinstance(raw_details, list) and raw_details else dimension_scores
details = []
for item in source:
if not isinstance(item, dict):
continue
deducted_reason = item.get("deducted_reason")
if not deducted_reason:
deducted_reason = "".join(str(value) for value in item.get("deductions", []) if value)
details.append(
{
"rule_id": item.get("rule_id"),
"dimension": str(item.get("dimension") or "综合表现"),
"score": float(item.get("score") or 0),
"deducted_reason": self._truncate(deducted_reason or "", 260),
"evidence_message_ids": self._ensure_list(item.get("evidence_message_ids") or item.get("evidence")),
"ai_confidence": float(item.get("ai_confidence") or 0.85),
"comment": self._truncate(item.get("comment") or item.get("improvement") or "", 220),
}
)
return details
def _normalize_errors(self, errors: object) -> list[dict]:
"""错误项归一化:转为报告可渲染的扣分项。"""
normalized = []
for index, item in enumerate(self._ensure_list(errors), start=1):
if isinstance(item, dict):
normalized.append(
{
"title": self._truncate(item.get("title") or f"问题 {index}", 60),
"description": self._truncate(item.get("description") or item.get("comment") or "", 180),
"severity": item.get("severity") or "medium",
"related_dimension": item.get("related_dimension") or item.get("dimension") or "综合表现",
}
)
else:
normalized.append(
{
"title": f"问题 {index}",
"description": self._truncate(str(item), 180),
"severity": "medium",
"related_dimension": "综合表现",
}
)
return normalized[:6]
def _normalize_guideline_refs(self, value: object, fallback_refs: list[dict]) -> list[dict]:
"""指南引用归一化:保证字符串、字典或空值都能转成字典数组。"""
raw_items = value if isinstance(value, list) else ([value] if value else fallback_refs)
normalized = []
for index, item in enumerate(raw_items or [], start=1):
if isinstance(item, dict):
normalized.append(
{
"title": self._truncate(item.get("title") or item.get("source") or f"参考依据 {index}", 80),
"content": self._truncate(
item.get("content") or item.get("text") or item.get("chunk") or item.get("summary") or "",
180,
),
"source": self._truncate(item.get("source") or item.get("source_type") or "knowledge_base", 80),
}
)
elif item:
normalized.append({"title": f"参考依据 {index}", "content": self._truncate(str(item), 180), "source": "llm_output"})
return normalized[:6]
def _ensure_list(self, value: object) -> list:
"""列表规整:拆分模型常见的 1/2/3 编号长文本。"""
if value is None:
return []
raw_items = value if isinstance(value, list) else [value]
items: list = []
for raw in raw_items:
if raw is None:
continue
if isinstance(raw, dict):
items.append(raw)
continue
text = str(raw).strip()
if not text:
continue
parts = re.split(r"(?:^|\s)(?:\d+[\.\、)]|[;]\d+[;])\s*", text)
parts = [part.strip(";、\n\t ") for part in parts if part and part.strip(";、\n\t ")]
items.extend(parts if len(parts) > 1 else [text])
return [self._truncate(item, 180) if not isinstance(item, dict) else item for item in items[:8]]
def _fallback_score(self, score_type: str, guideline_refs: list[dict]) -> dict:
"""评分兜底:LLM 失败时生成稳定结构化评分,保证报告流程可展示。"""
data = {
"score_type": "percentage",
"total_score": 80,
"dimension_scores": [
{
"dimension": "信息获取",
"score": 20,
"max_score": 25,
"comment": "完成主要症状追问,但儿科专科病史仍需补充。",
"evidence": ["围绕发热、咳嗽、喘息等核心症状展开问诊。"],
"deductions": ["既往喘息史、过敏史、疫苗接种史、家属照护能力等信息不够完整。"],
"improvement": "下一次按主诉、现病史、既往史、个人史、家族史和家属顾虑逐项补全。",
},
{
"dimension": "分析推理",
"score": 16,
"max_score": 20,
"comment": "诊断方向基本正确,但严重程度分层需要更清晰。",
"evidence": ["主要诊断指向支气管肺炎。"],
"deductions": ["鉴别诊断和严重程度判断未充分引用血氧、胸片和炎症指标。"],
"improvement": "把症状、体征、影像、血氧和炎症指标逐条对应到诊断依据和病情分层。",
},
{
"dimension": "检查利用",
"score": 12,
"max_score": 15,
"comment": "关键检查申请较完整,但检查结果解释仍可细化。",
"evidence": ["胸片、血氧或炎症指标可支持肺炎诊断和严重程度判断。"],
"deductions": ["对 SpO2、胸片异常和炎症指标的临床意义解释不够具体。"],
"improvement": "在诊断依据中写明每项异常检查如何支持诊断、鉴别诊断和治疗决策。",
},
{
"dimension": "处置决策",
"score": 16,
"max_score": 20,
"comment": "治疗原则基本完整,仍需补充监测和升级处理条件。",
"evidence": ["治疗原则覆盖抗感染、止咳平喘、改善氧合和观察病情。"],
"deductions": ["药物选择、门诊/住院判断、风险预案和随访节点不够具体。"],
"improvement": "补充治疗适应证、监测指标、病情加重预案和复诊/住院指征。",
},
{
"dimension": "沟通人文",
"score": 8,
"max_score": 10,
"comment": "有基本沟通意识,但家属顾虑回应不够结构化。",
"evidence": ["治疗方案中包含向家属说明病情和复诊指征。"],
"deductions": ["知情同意、家属担心、用药注意事项和家庭护理教育仍需补充。"],
"improvement": "增加对家属顾虑的回应、危险信号说明和家庭护理指导。",
},
{
"dimension": "临床整合",
"score": 8,
"max_score": 10,
"comment": "完成训练闭环,证据衔接仍需强化。",
"evidence": ["完成问诊、检查、诊断、治疗和评价流程。"],
"deductions": ["各阶段证据之间的逻辑衔接和 SOAP 化表达不够清晰。"],
"improvement": "用 SOAP 结构归纳病情,把证据、判断和计划串联起来。",
},
],
"score_details": [
{
"rule_id": None,
"dimension": "信息获取",
"score": 20,
"deducted_reason": "既往喘息史、过敏史、疫苗接种史、家属照护能力等信息不够完整。",
"evidence_message_ids": ["围绕发热、咳嗽、喘息等核心症状展开问诊。"],
"ai_confidence": 0.85,
"comment": "完成主要症状追问,但儿科专科病史仍需补充。",
},
{
"rule_id": None,
"dimension": "分析推理",
"score": 16,
"deducted_reason": "鉴别诊断和严重程度判断未充分引用血氧、胸片和炎症指标。",
"evidence_message_ids": ["主要诊断指向支气管肺炎。"],
"ai_confidence": 0.84,
"comment": "诊断方向基本正确,但严重程度分层需要更清晰。",
},
{
"rule_id": None,
"dimension": "检查利用",
"score": 12,
"deducted_reason": "对 SpO2、胸片异常和炎症指标的临床意义解释不够具体。",
"evidence_message_ids": ["胸片、血氧或炎症指标可支持肺炎诊断和严重程度判断。"],
"ai_confidence": 0.84,
"comment": "关键检查申请较完整,但检查结果解释仍可细化。",
},
],
"errors": [
{
"title": "信息采集不够系统",
"description": "儿科特异性病史追问不足,影响对喘息诱因、感染风险和肺炎严重程度的判断。",
"severity": "medium",
"related_dimension": "信息获取",
}
],
"improvement_plan": [
"补充疫苗接种史、过敏史、既往喘息史、近期接触史和家庭照护能力评估。",
"诊断依据中逐条引用发热、咳嗽、喘息、肺部体征、胸片、血氧和炎症指标。",
"治疗方案中补充监测指标、病情加重预案、复诊/住院指征和家属健康教育。",
"沟通环节增加家属担心回应、知情说明和家庭护理要点。",
],
"evidence_summary": [
"问诊:已完成核心症状追问,但儿科特异性病史仍需补全。",
"检查:需重点评价是否申请并利用胸片、血氧和炎症指标。",
"诊断:主要诊断方向基本正确,鉴别诊断和严重程度分层需要更完整。",
"治疗:治疗原则基本覆盖抗感染、平喘、氧合和观察。",
"沟通:已有基本告知意识,仍需加强家属顾虑回应和健康教育。",
],
"guideline_refs": guideline_refs,
"overall_comment": "本次训练完成核心闭环,诊断和处理方向基本正确;后续需要强化问诊结构化、检查结果利用和治疗细化。",
}
return self._convert_to_five_point(data) if score_type == "five_point" else data
def _convert_to_five_point(self, data: dict) -> dict:
"""分数转换:将百分制评价转换为五分制,同时保留证据、扣分和改进细则。"""
converted = dict(data)
converted["score_type"] = "five_point"
converted["total_score"] = round(float(data.get("total_score", 0)) / 20, 1)
converted["dimension_scores"] = [
{
**item,
"score": round(float(item.get("score", 0)) / float(item.get("max_score") or 100) * 5, 1),
"max_score": 5,
}
for item in data.get("dimension_scores", [])
]
converted["score_details"] = [
{
**item,
"score": round(float(item.get("score", 0)) / 20, 1),
}
for item in data.get("score_details", [])
]
return converted
def _truncate(self, value: Any, limit: int) -> str:
"""文本截断:限制模型输入和报告字段长度,降低评分耗时并稳定排版。"""
if value is None:
return ""
text = str(value).strip()
return text if len(text) <= limit else f"{text[:limit]}..."